This is a simple and lightweight Streams API inspired by Java Streams with support for type hinting.
This package is released as tinystream
at pypi.
from tinystream import Stream
stream = Stream([1, 2, 3, 4, 5]) # Stream.of_many(*), Stream.of_dict()
stream \
.map(lambda x: x + 1) \ # flatmap(), peek(), map_key(), map_kwargs(), map_keys()
.filter(lambda x: x > 2) \ # filter_key(), filter_type()
.sorted(reverse=True) \ # sort()
.reverse() \
.limit(2) \
.concat([4]) \
.sum() # reduce(), max(), min(), collect(), count(), find()
Aggregators like sum()
, count()
, max()
will collect()
the data and end the stream. collect()
also caches the data and can be called multiple times, since it returns only a list
.
Some aggregators like sum()
, max()
are Opt
:
assert Stream((1, 2, 3, 4, 5)).sum().get() == 15
You can typehint datatypes like:
from dataclasses import dataclass
@dataclass
class Node:
name: str
parent: "Node" = None
parent = Node(name="B")
child = Node(name="A", parent=parent)
for lambdas:
stream = Stream([child])
assert stream.map(lambda x: x.parent).type(Node).next().get().name == "B"
This is not necessary when you pass a mapping function:
def map_parent(n: Node):
return n.parent
assert stream.map(map_parent).next().get().name == "B"
Dictionaries are streamed as tuple(key, value)
children = {"a": Node(name="Child")}
stream = Stream.of_dict(children)
for item in stream:
# item[0] is known as str
# item[1] is known as Node
This is the same like (but without known types):
stream = Stream(children)
items_with_name = Stream([child]).filter_key("name")
items_with_name = Stream([child]).filter_key_value("name", "Child")
nodes_only = Stream([child]).filter_type(Node)
names = Stream([child]).map_key("name")
list = [
{"node": Node(name="Node A")},
{"node": Node(name="Node B")},
{"node": Node(name="Node C")},
{"node": Node(name="Node D")},
]
names = Stream(list).map_keys("node", "name")
all_names = Stream([child]).map_key("name").join(", ")
list = [
{"name": "Node A"},
{"name": "Node B"},
]
# Short cut for map(lambda x: Node(**x))
nodes = Stream(list).map_kwargs(Node)
many = Stream.of_many([1, 2, 3], (4, 5, 6))
many = many.concat([7, 8, 9])
stream = Stream(["a", "b", "c"]).on_end(lambda: print("Finished"))
char = stream.next().get()
if char == "a":
stream.end()
Get next value as Opt
:
assert Stream((1, 2, 3, 4, 5)).next().present
Mapping:
assert Opt("String").map(str.lower).len == 6
Get default value:
assert Opt(None).get(6) == 6
assert Opt(None).get(lambda: 6) == 6
assert Opt(None).if_absent(lambda: 3).present
Filter value:
assert Opt(0).filter(lambda x: x > 0).absent
You can also access optional index elements of the stream, but this will collect()
and end the stream.
assert Stream([])[2].absent
data = {
"ranges": [
{"days": 3},
{"weeks": 1},
]
}
# With tinystream Stream
for range_data in Opt(data).map_key("ranges").stream().map_kwargs(timedelta):
pass
# Vanilly Python
if "ranges" in data:
range_data: timedelta
for range_data in map(lambda x: timedelta(**x), data["ranges"]):
pass
# tinystream Opt
var = Opt(my_dict).kmap("key").filter(not_empty).get("default")
# Vanilla Python
var = my_dict["key"] if "key" in my_dict and not_empty(my_dict["key"]) else "default"
There are a couple of other implementation to fulfill similar requirements.
- https://github.com/vxgmichel/aiostream
- https://github.com/python-streamz/streamz
- https://pypi.org/project/fluentpy
- https://github.com/ramsteak/streams
- https://github.com/alemazzo/Python-Java-Stream (outdated)
- https://github.com/JaviOverflow/python-streams (outdated)
- https://github.com/9seconds/streams/ (outdated)
- https://github.com/tolsac/streampy (outdated)
- Apache Spark
PYTHONPATH="." pytest --cov=tinystream -n 4 tests/