Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v4.6.0
rev: v5.0.0
hooks:
- id: check-yaml
- id: check-added-large-files
Expand All @@ -23,6 +23,6 @@ repos:
pass_filenames: false

- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.4.2
rev: v0.6.0
hooks:
- id: ruff
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

### New Features

- Reimplemented `head_option`/`first_option` and `last_option` to return an `Option` instead of `None`
- Added `first_or_none`, a function to match `head_or_none`
- Added run_test.sh script
- Added [parametrize](https://pypi.org/project/parametrize/) for parameterized unit tests
Expand Down
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -382,8 +382,10 @@ complete documentation reference
| `slice(start, until)` | Sequence starting at `start` and including elements up to `until` | transformation |
| `head(no_wrap=None)` / `first(no_wrap=None)` | Returns first element in sequence (if `no_wrap=True`, the result will never be wrapped with `Sequence`) | action |
| `head_or_none(no_wrap=None)` / `first_or_none(no_wrap=None)` | Returns first element in sequence or `None` if it's empty (if `no_wrap=True`, the result will never be wrapped with `Sequence`) | action |
| `head_option()` / `first_option()` | Returns an `Option` containing the first element in the sequence or `None` if it's empty | action |
| `last(no_wrap=None)` | Returns last element in sequence (if `no_wrap=True`, the result will never be wrapped with `Sequence`) | action |
| `last_or_none(no_wrap=None)` | Returns last element in sequence or `None` if it's empty (if `no_wrap=True`, the result will never be wrapped with `Sequence`) | action |
| `last_option()` | Returns an `Option` containing the last element in the sequence or `None` if it's empty | action |
| `len()` / `size()` | Returns length of sequence | action |
| `count(func)` | Returns count of elements in sequence where `func(element)` is True | action |
| `empty()` | Returns `True` if the sequence has zero length | action |
Expand Down
175 changes: 173 additions & 2 deletions functional/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
"""

from __future__ import annotations
from dataclasses import dataclass
from operator import mul, add
import collections
from functools import reduce, wraps, partial
Expand All @@ -12,9 +13,8 @@
import sqlite3
import re

from typing import TYPE_CHECKING
import typing
from typing import NamedTuple, TypeVar, Generic, overload
from typing import NamedTuple, TypeVar, Generic, overload, TYPE_CHECKING

from tabulate import tabulate

Expand Down Expand Up @@ -329,6 +329,21 @@ def head_or_none(self, no_wrap: bool | None = None) -> _T_co | None:
return None
return self.head(no_wrap=no_wrap)

def head_option(self):
"""
Returns an Option with the head of the sequence,
or an empty Option if sequence is empty.

>>> seq([1, 2, 3]).head_option()
Option(1)

>>> seq([]).head_option()
Option(None)

:return: Option containing the head or None
"""
return Option(None if self.empty() else self.head_or_none(True))

def first_or_none(self, no_wrap: bool | None = None) -> _T_co | None:
"""
Returns the first element of the sequence or None, if the sequence is empty.
Expand All @@ -344,6 +359,21 @@ def first_or_none(self, no_wrap: bool | None = None) -> _T_co | None:
"""
return self.head_or_none(no_wrap=no_wrap)

def first_option(self):
"""
Returns an Option with the first element of the sequence,
or an empty Option if sequence is empty.

>>> seq([1, 2, 3]).head_option()
Option(1)

>>> seq([]).head_option()
Option(None)

:return: Option containing the first element or None
"""
return self.head_option()

def last(self, no_wrap: bool | None = None) -> _T_co:
"""
Returns the last element of the sequence.
Expand Down Expand Up @@ -383,6 +413,21 @@ def last_or_none(self, no_wrap: bool | None = None) -> _T_co | None:
return None
return self.last(no_wrap=no_wrap)

def last_option(self):
"""
Returns an Option with the last element of the sequence,
or an empty Option if the sequence is empty.

>>> seq([1, 2, 3]).head_option()
Option(3)

>>> seq([]).head_option()
Option(None)

:return: Option containing the last element or None
"""
return Option(None if self.empty() else self.last_or_none(True))

def init(self) -> Sequence[_T_co]:
"""
Returns the sequence, without its last element.
Expand Down Expand Up @@ -2090,6 +2135,132 @@ def tabulate(
)


@dataclass
class Option(Generic[_T_co]):
"""
An Option is a container for a single value, or None if no value is available.
When an Option contains a value, it provides a number of functions from Sequence
that can be executed on that value.
"""

value: _T_co
seq: Sequence[_T_co]

def __init__(self, value: _T_co):
self.value = value
if value is None:
self.seq = Sequence[_T_co]([])
elif isinstance(value, list):
self.seq = Sequence(value)
elif isinstance(value, Sequence):
self.seq = value
else:
self.seq = Sequence[_T_co]([value])

def __str__(self):
return f"Option({self.value})"

def __repr__(self):
return self.__str__()

def map(self, func: Callable[[_T_co], _T]) -> Option[_T]:
"""
Maps func onto this Option's value, or returns this Option if empty.

>>> Option(1).map(lambda x: x * -1)
Option(-1)

>>> Option(None).map(lambda x: x * -1)
Option(None)

:param func: function to map with
:return: Option with func mapped onto its value, or this Option if empty
"""
return self.seq.map(func).head_option()

def flat_map(self, func: Callable[[_T_co], Iterable[_T]]) -> Sequence[_T]:
"""
Applies func on each element of this Option's value then flattens the value,
returning a Sequence or raising an error if the Option's value is not a list or Sequence.
If Option is empty, it's returned instead.

>>> Option([[1, 2], [3, 4], [5, 6]]).flat_map(lambda x: x)
[1, 2, 3, 4, 5, 6]

>>> Option(["a", "bc", "def"]).flat_map(list)
['a', 'b', 'c', 'd', 'e', 'f']

>>> Option([[1], [2], [3]]).flat_map(lambda x: x * 2)
[1, 1, 2, 2, 3, 3]

>>> Option(None).flat_map(lambda x: x * -1)
Option(None)

:param func: function to apply to the value of this Option
:return: sequence resulting from value mapped to func then flattered,
or empty Sequence if Option is empty
:raise ValueError: if the value of Option is not a list or Sequence
"""
if not isinstance(self.value, (list, Sequence)):
raise ValueError("Single values cannot be converted to a Sequence")
return self.seq.flat_map(func)

def flatten(self) -> Sequence[_T]:
"""
Flattens the value of this Option by taking the elements of its element
and putting them into a new Sequence.

:return: value of Option as a sequence
:raise ValueError: if the value of Option is not a list or Sequence
"""
return self.flat_map(identity)

def plus(self, sequence: list | Sequence) -> Sequence:
if isinstance(sequence, Sequence):
seq = sequence
elif isinstance(sequence, list):
seq = Sequence(sequence)
else:
raise ValueError("sequence should be o list or Sequence")
return self.seq + seq

def non_empty(self):
"""
Returns True if this Option contains a value, or False if no value is contained.

:return: True if Option has a value, or False if empty
"""
return self.seq.non_empty()

def empty(self):
"""
Returns True if this Option does not contain a value, or False if a value is contained.

:return: True if Option is empty, or False if value is present
"""
return self.seq.empty()

def or_else(self, other: Any) -> _T_co:
"""
Returns this Option's value, or `other` if Option is empty.

:param other: the value to return if no value is present
:return: Option's value if present, else `other`
"""
return self.value if self.non_empty() else other

def or_raise_error(self) -> _T_co:
"""
Returns this Option's value, or raises an Error.

:return: Option's value if present
:raise ValueError: if Option is empty
"""
if self.empty():
raise ValueError("Option is empty")
return self.value


_PrimitiveT = TypeVar("_PrimitiveT", str, bool, float, complex, bytes, int)
_NamedTupleT = TypeVar("_NamedTupleT", bound=NamedTuple)
_DictT = TypeVar("_DictT", bound=dict)
Expand Down
82 changes: 79 additions & 3 deletions functional/test/test_functional.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,14 @@
from collections import namedtuple, deque
from itertools import product

from functional.pipeline import Sequence, is_iterable, _wrap, extend
from parametrize import parametrize # type: ignore

from functional.pipeline import Sequence, is_iterable, _wrap, extend, Option
from functional.transformations import name
from functional import seq, pseq

from parametrize import parametrize # type: ignore

Data = namedtuple("Data", "x y")


Expand All @@ -25,8 +29,8 @@ class TestPipeline(unittest.TestCase):
def setUp(self):
self.seq = seq

def assert_type(self, s):
self.assertTrue(isinstance(s, Sequence))
def assert_type(self, s, t: type = Sequence):
self.assertTrue(isinstance(s, t))

def assert_not_type(self, s):
self.assertFalse(isinstance(s, Sequence))
Expand Down Expand Up @@ -199,6 +203,42 @@ def test_head_or_none(self):
l = self.seq([deque(), deque()], no_wrap=True).head_or_none(no_wrap=False)
self.assert_type(l)

@parametrize(
"sequence, present, head, mapped_value, flat_mapped_value",
[
([], False, None, None, []),
([1, 2, 3, 4, 5], True, 1, 0, None),
([[[1, 2]], 3, 4, 5], True, [[1, 2]], 0, [1, 2]),
],
)
def test_head_option(
self, sequence, present, head, mapped_value, flat_mapped_value
):
head_option = self.seq(sequence).head_option()
self.assert_type(head_option, Option)
self.assertEqual(present, head_option.non_empty())
self.assertEqual(not present, head_option.empty())
self.assertEqual(head, head_option.or_else(None))
if head is None:
h = []
elif isinstance(head, list):
h = head
else:
h = [head]
s = [2, 3, 4]
self.assertEqual(self.seq(h + s), head_option.plus(s))
self.assert_type(head_option.plus(s))
self.assertEqual(mapped_value, head_option.map(lambda x: 0).or_else(None))
self.assert_type(head_option.map(lambda x: 0), Option)
if flat_mapped_value:
self.assertEqual(self.seq(flat_mapped_value), head_option.flatten())
self.assert_type(head_option.flat_map(lambda x: x))
if sequence:
self.assertEqual(head, head_option.or_raise_error())
else:
with self.assertRaises(ValueError):
head_option.or_raise_error()

def test_last(self):
l = self.seq([1, 2, 3]).map(lambda x: x)
self.assertEqual(l.last(), 3)
Expand Down Expand Up @@ -240,6 +280,42 @@ def test_last_or_none(self):
l = self.seq([deque(), deque()], no_wrap=True).last_or_none(no_wrap=False)
self.assert_type(l)

@parametrize(
"sequence, present, last, mapped_value, flat_mapped_value",
[
([], False, None, None, []),
([1, 2, 3, 4, 5], True, 5, 0, None),
([1, 2, 3, [[4, 5]]], True, [[4, 5]], 0, [4, 5]),
],
)
def test_last_option(
self, sequence, present, last, mapped_value, flat_mapped_value
):
last_option = self.seq(sequence).last_option()
self.assert_type(last_option, Option)
self.assertEqual(present, last_option.non_empty())
self.assertEqual(not present, last_option.empty())
self.assertEqual(last, last_option.or_else(None))
if last is None:
l = []
elif isinstance(last, list):
l = last
else:
l = [last]
s = [2, 3, 4]
self.assertEqual(self.seq(l + s), last_option.plus(s))
self.assert_type(last_option.plus(s))
self.assertEqual(mapped_value, last_option.map(lambda x: 0).or_else(None))
self.assert_type(last_option.map(lambda x: 0), Option)
if flat_mapped_value:
self.assertEqual(self.seq(flat_mapped_value), last_option.flatten())
self.assert_type(last_option.flat_map(lambda x: x))
if sequence:
self.assertEqual(last, last_option.or_raise_error())
else:
with self.assertRaises(ValueError):
last_option.or_raise_error()

def test_init(self):
result = self.seq([1, 2, 3, 4]).map(lambda x: x).init()
expect = [1, 2, 3]
Expand Down
6 changes: 5 additions & 1 deletion functional/test/test_type.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
if TYPE_CHECKING:
from typing import Any, Iterator
from functional import seq
from functional.pipeline import Sequence
from functional.pipeline import Sequence, Option
from pandas import DataFrame

def type_checking() -> None:
Expand All @@ -30,12 +30,16 @@ def type_checking() -> None:

t_head_or_none: int | None = seq([1, 2, 3]).head_or_none()

t_head_option: Option[int] = seq([1, 2, 3]).head_option()

t_first_or_none: int | None = seq([1, 2, 3]).first_or_none()

t_last: int = seq([1, 2, 3]).last()

t_last_or_none: int | None = seq([1, 2, 3]).last_or_none()

t_last_option: Option[int] = seq([1, 2, 3]).last_option()

t_init: Sequence[int] = seq([1, 2, 3]).init()

t_tail: Sequence[int] = seq([1, 2, 3]).tail()
Expand Down
Loading
Loading