JAF (Just Another Flow) is a powerful streaming data processing system for JSON/JSONL data with a focus on lazy evaluation, composability, and a fluent API.
- ๐ Streaming Architecture - Process large datasets without loading everything into memory
- ๐ Lazy Evaluation - Build complex pipelines that only execute when needed
- ๐ฏ Fluent API - Intuitive method chaining for readable code
- ๐งฉ Composable - Combine operations freely, integrate with other tools
- ๐ฆ Multiple Sources - Files, directories, stdin, memory, compressed files, infinite streams
- ๐ ๏ธ Unix Philosophy - Works great with pipes and other command-line tools
pip install jaf
# Filter JSON data (lazy by default)
jaf filter users.jsonl '["gt?", "@age", 25]'
# Evaluate immediately
jaf filter users.jsonl '["gt?", "@age", 25]' --eval
# Chain operations
jaf filter users.jsonl '["eq?", "@status", "active"]' | \
jaf map - "@email" | \
jaf eval -
# Combine with other tools
jaf filter logs.jsonl '["eq?", "@level", "ERROR"]' --eval | \
ja groupby service
from jaf import stream
# Build a pipeline
pipeline = stream("users.jsonl") \
.filter(["gt?", "@age", 25]) \
.map(["dict", "name", "@name", "email", "@email"]) \
.take(10)
# Execute when ready
for user in pipeline.evaluate():
print(user)
Operations don't execute until you call .evaluate()
or use --eval
:
# This doesn't read any data yet
pipeline = stream("huge_file.jsonl") \
.filter(["contains?", "@tags", "important"]) \
.map("@message")
# Now it processes data
for message in pipeline.evaluate():
process(message)
JAF uses S-expression syntax for queries:
# Simple comparisons
["eq?", "@status", "active"] # status == "active"
["gt?", "@age", 25] # age > 25
["contains?", "@tags", "python"] # "python" in tags
# Boolean logic
["and",
["gte?", "@age", 18],
["eq?", "@verified", true]
]
# Path navigation with @
["eq?", "@user.profile.name", "Alice"] # Nested access
["any", "@items.*.inStock"] # Wildcard
["exists?", "@**.error"] # Recursive search
- filter - Keep items matching a predicate
- map - Transform each item
- take/skip - Limit or paginate results
- batch - Group items into chunks
- Boolean ops - AND, OR, NOT on filtered streams
- Getting Started - Installation and first steps
- API Guide - Complete Python API reference
- Query Language - Query syntax and operators
- CLI Reference - Command-line usage
- Cookbook - Practical examples
# Find errors in specific services
errors = stream("app.log.jsonl") \
.filter(["and",
["eq?", "@level", "ERROR"],
["in?", "@service", ["api", "auth"]]
]) \
.map(["dict",
"time", "@timestamp",
"service", "@service",
"message", "@message"
]) \
.evaluate()
# Find invalid records
invalid = stream("users.jsonl") \
.filter(["or",
["not", ["exists?", "@email"]],
["not", ["regex-match?", "@email", "^[^@]+@[^@]+\\.[^@]+$"]]
]) \
.evaluate()
# Transform and filter data
pipeline = stream("raw_sales.jsonl") \
.filter(["eq?", "@status", "completed"]) \
.map(["dict",
"date", ["date", "@timestamp"],
"amount", "@amount",
"category", ["if", ["gt?", "@amount", 1000], "high", "low"]
]) \
.batch(1000)
# Process in chunks
for batch in pipeline.evaluate():
bulk_insert(batch)
JAF works seamlessly with other tools:
# With jsonl-algebra
jaf filter orders.jsonl '["gt?", "@amount", 100]' --eval | \
ja groupby customer_id --aggregate 'total:amount:sum'
# With jq
jaf filter data.jsonl '["exists?", "@metadata"]' --eval | \
jq '.metadata'
# With standard Unix tools
jaf map users.jsonl "@email" --eval | sort | uniq -c
JAF is designed for streaming large datasets:
- Processes one item at a time
- Minimal memory footprint
- Early termination (e.g., with
take
) - Efficient pipeline composition
Contributions are welcome! Please read our Contributing Guide for details.
JAF is licensed under the MIT License. See LICENSE for details.
- jsonl-algebra - Relational operations on JSONL
- jq - Command-line JSON processor