Skip to content

📊 Chunked & Streaming Processing

Functions for handling large datasets efficiently with memory management.

🎯 Overview

Chunked and streaming functions allow processing of large datasets that don't fit in memory.

📦 Functions

serialize_chunked()

datason.serialize_chunked(obj: Any, chunk_size: int = 1000, config: Optional[SerializationConfig] = None, memory_limit_mb: Optional[int] = None) -> ChunkedSerializationResult

Serialize large objects in memory-bounded chunks.

This function breaks large objects (lists, DataFrames, arrays) into smaller chunks to enable processing of datasets larger than available memory.

Parameters:

Name Type Description Default
obj Any

Object to serialize (typically list, DataFrame, or array)

required
chunk_size int

Number of items per chunk

1000
config Optional[SerializationConfig]

Serialization configuration

None
memory_limit_mb Optional[int]

Optional memory limit in MB (not enforced yet, for future use)

None

Returns:

Type Description
ChunkedSerializationResult

ChunkedSerializationResult with iterator of serialized chunks

Examples:

>>> large_list = list(range(10000))
>>> result = serialize_chunked(large_list, chunk_size=100)
>>> chunks = result.to_list()  # Get all chunks
>>> len(chunks)  # 100 chunks of 100 items each
100
>>> # Save directly to file without loading all chunks
>>> result = serialize_chunked(large_data, chunk_size=1000)
>>> result.save_to_file("large_data.jsonl", format="jsonl")
Source code in datason/core_new.py
def serialize_chunked(
    obj: Any,
    chunk_size: int = 1000,
    config: Optional["SerializationConfig"] = None,
    memory_limit_mb: Optional[int] = None,
) -> ChunkedSerializationResult:
    """Serialize large objects in memory-bounded chunks.

    This function breaks large objects (lists, DataFrames, arrays) into smaller chunks
    to enable processing of datasets larger than available memory.

    Args:
        obj: Object to serialize (typically list, DataFrame, or array)
        chunk_size: Number of items per chunk
        config: Serialization configuration
        memory_limit_mb: Optional memory limit in MB (not enforced yet, for future use)

    Returns:
        ChunkedSerializationResult with iterator of serialized chunks

    Examples:
        >>> large_list = list(range(10000))
        >>> result = serialize_chunked(large_list, chunk_size=100)
        >>> chunks = result.to_list()  # Get all chunks
        >>> len(chunks)  # 100 chunks of 100 items each
        100

        >>> # Save directly to file without loading all chunks
        >>> result = serialize_chunked(large_data, chunk_size=1000)
        >>> result.save_to_file("large_data.jsonl", format="jsonl")
    """
    if config is None and _config_available:
        config = get_default_config()

    # Determine chunking strategy based on object type
    if isinstance(obj, (list, tuple)):
        return _chunk_sequence(obj, chunk_size, config)
    elif pd is not None and isinstance(obj, pd.DataFrame):
        return _chunk_dataframe(obj, chunk_size, config)
    elif np is not None and isinstance(obj, np.ndarray):
        return _chunk_numpy_array(obj, chunk_size, config)
    elif isinstance(obj, dict):
        return _chunk_dict(obj, chunk_size, config)
    else:
        # For non-chunnable objects, return single chunk
        single_chunk = serialize(obj, config)
        metadata = {
            "total_chunks": 1,
            "chunk_size": chunk_size,
            "object_type": type(obj).__name__,
            "chunking_strategy": "single_object",
        }
        return ChunkedSerializationResult(iter([single_chunk]), metadata)

ChunkedSerializationResult

datason.ChunkedSerializationResult(chunks: Iterator[Any], metadata: Dict[str, Any])

Result container for chunked serialization operations.

Initialize chunked result.

Parameters:

Name Type Description Default
chunks Iterator[Any]

Iterator of serialized chunks

required
metadata Dict[str, Any]

Metadata about the chunking operation

required
Source code in datason/core_new.py
def __init__(self, chunks: Iterator[Any], metadata: Dict[str, Any]):
    """Initialize chunked result.

    Args:
        chunks: Iterator of serialized chunks
        metadata: Metadata about the chunking operation
    """
    self.chunks = chunks
    self.metadata = metadata

save_to_file(file_path: Union[str, Path], format: str = 'jsonl') -> None

Save chunks to a file.

Parameters:

Name Type Description Default
file_path Union[str, Path]

Path to save the chunks

required
format str

Format to save ('jsonl' for JSON lines, 'json' for array)

'jsonl'
Source code in datason/core_new.py
def save_to_file(self, file_path: Union[str, Path], format: str = "jsonl") -> None:
    """Save chunks to a file.

    Args:
        file_path: Path to save the chunks
        format: Format to save ('jsonl' for JSON lines, 'json' for array)
    """
    file_path = Path(file_path)

    with file_path.open("w") as f:
        if format == "jsonl":
            # JSON Lines format - one JSON object per line
            for chunk in self.chunks:
                json.dump(chunk, f, ensure_ascii=False)
                f.write("\n")
        elif format == "json":
            # JSON array format
            chunk_list = list(self.chunks)
            json.dump({"chunks": chunk_list, "metadata": self.metadata}, f, ensure_ascii=False, indent=2)
        else:
            raise ValueError(f"Unsupported format: {format}. Use 'jsonl' or 'json'")

to_list() -> list

Convert all chunks to a list (loads everything into memory).

Source code in datason/core_new.py
def to_list(self) -> list:
    """Convert all chunks to a list (loads everything into memory)."""
    return list(self.chunks)

StreamingSerializer

datason.StreamingSerializer(file_path: Union[str, Path], config: Optional[SerializationConfig] = None, format: str = 'jsonl', buffer_size: int = 8192)

Context manager for streaming serialization to files.

Enables processing of datasets larger than available memory by writing serialized data directly to files without keeping everything in memory.

Initialize streaming serializer.

Parameters:

Name Type Description Default
file_path Union[str, Path]

Path to output file

required
config Optional[SerializationConfig]

Serialization configuration

None
format str

Output format ('jsonl' or 'json')

'jsonl'
buffer_size int

Write buffer size in bytes

8192
Source code in datason/core_new.py
def __init__(
    self,
    file_path: Union[str, Path],
    config: Optional["SerializationConfig"] = None,
    format: str = "jsonl",
    buffer_size: int = 8192,
):
    """Initialize streaming serializer.

    Args:
        file_path: Path to output file
        config: Serialization configuration
        format: Output format ('jsonl' or 'json')
        buffer_size: Write buffer size in bytes
    """
    self.file_path = Path(file_path)
    self.config = config or (get_default_config() if _config_available else None)
    self.format = format
    self.buffer_size = buffer_size
    self._file: Optional[Any] = None
    self._items_written = 0
    self._json_array_started = False

__enter__() -> StreamingSerializer

Enter context manager.

Source code in datason/core_new.py
def __enter__(self) -> "StreamingSerializer":
    """Enter context manager."""
    # Check if compression is needed based on file extension
    if self.file_path.suffix == ".gz" or (
        len(self.file_path.suffixes) > 1 and self.file_path.suffixes[-1] == ".gz"
    ):
        import gzip

        self._file = gzip.open(self.file_path, "wt", encoding="utf-8")
    else:
        self._file = self.file_path.open("w", buffering=self.buffer_size)

    if self.format == "json":
        # Start JSON array
        self._file.write('{"data": [')
        self._json_array_started = True

    return self

__exit__(exc_type: Any, exc_val: Any, exc_tb: Any) -> None

Exit context manager.

Source code in datason/core_new.py
def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
    """Exit context manager."""
    if self._file:
        if self.format == "json" and self._json_array_started:
            # Close JSON array and add metadata
            self._file.write(f'], "metadata": {{"items_written": {self._items_written}}}}}')

        self._file.close()
        self._file = None

write(obj: Any) -> None

Write a single object to the stream.

Parameters:

Name Type Description Default
obj Any

Object to serialize and write

required
Source code in datason/core_new.py
def write(self, obj: Any) -> None:
    """Write a single object to the stream.

    Args:
        obj: Object to serialize and write
    """
    if not self._file:
        raise RuntimeError("StreamingSerializer not in context manager")

    serialized = serialize(obj, self.config)

    if self.format == "jsonl":
        # JSON Lines: one object per line
        json.dump(serialized, self._file, ensure_ascii=False)
        self._file.write("\n")
    elif self.format == "json":
        # JSON array format
        if self._items_written > 0:
            self._file.write(", ")
        json.dump(serialized, self._file, ensure_ascii=False)
    else:
        raise ValueError(f"Unsupported format: {self.format}")

    self._items_written += 1

write_chunked(obj: Any, chunk_size: int = 1000) -> None

Write a large object using chunked serialization.

Parameters:

Name Type Description Default
obj Any

Large object to chunk and write

required
chunk_size int

Size of each chunk

1000
Source code in datason/core_new.py
def write_chunked(self, obj: Any, chunk_size: int = 1000) -> None:
    """Write a large object using chunked serialization.

    Args:
        obj: Large object to chunk and write
        chunk_size: Size of each chunk
    """
    chunked_result = serialize_chunked(obj, chunk_size, self.config)

    for chunk in chunked_result.chunks:
        self.write(chunk)

estimate_memory_usage()

datason.estimate_memory_usage(obj: Any, config: Optional[SerializationConfig] = None) -> Dict[str, Any]

Estimate memory usage for serializing an object.

This is a rough estimation to help users decide on chunking strategies.

Parameters:

Name Type Description Default
obj Any

Object to analyze

required
config Optional[SerializationConfig]

Serialization configuration

None

Returns:

Type Description
Dict[str, Any]

Dictionary with memory usage estimates

Examples:

>>> import pandas as pd
>>> df = pd.DataFrame({'a': range(10000), 'b': range(10000)})
>>> stats = estimate_memory_usage(df)
>>> print(f"Estimated serialized size: {stats['estimated_serialized_mb']:.1f} MB")
>>> print(f"Recommended chunk size: {stats['recommended_chunk_size']}")
Source code in datason/core_new.py
def estimate_memory_usage(obj: Any, config: Optional["SerializationConfig"] = None) -> Dict[str, Any]:
    """Estimate memory usage for serializing an object.

    This is a rough estimation to help users decide on chunking strategies.

    Args:
        obj: Object to analyze
        config: Serialization configuration

    Returns:
        Dictionary with memory usage estimates

    Examples:
        >>> import pandas as pd
        >>> df = pd.DataFrame({'a': range(10000), 'b': range(10000)})
        >>> stats = estimate_memory_usage(df)
        >>> print(f"Estimated serialized size: {stats['estimated_serialized_mb']:.1f} MB")
        >>> print(f"Recommended chunk size: {stats['recommended_chunk_size']}")
    """
    import sys

    # Get basic object size
    object_size_bytes = sys.getsizeof(obj)

    # Estimate based on object type
    if isinstance(obj, (list, tuple)) or pd is not None and isinstance(obj, pd.DataFrame):
        item_count = len(obj)
        estimated_item_size = object_size_bytes / max(item_count, 1)
    elif np is not None and isinstance(obj, np.ndarray):
        item_count = obj.shape[0] if obj.ndim > 0 else 1
        estimated_item_size = object_size_bytes / max(item_count, 1)
    elif isinstance(obj, dict):
        item_count = len(obj)
        estimated_item_size = object_size_bytes / max(item_count, 1)
    else:
        item_count = 1
        estimated_item_size = object_size_bytes

    # Serialization typically increases size by 1.5-3x for complex objects
    serialization_overhead = 2.0
    estimated_serialized_bytes = object_size_bytes * serialization_overhead

    # Recommend chunk size to keep chunks under 50MB
    target_chunk_size_mb = 50
    target_chunk_size_bytes = target_chunk_size_mb * 1024 * 1024

    if estimated_item_size > 0:
        recommended_chunk_size = max(1, int(target_chunk_size_bytes / (estimated_item_size * serialization_overhead)))
    else:
        recommended_chunk_size = 1000  # Default fallback

    return {
        "object_type": type(obj).__name__,
        "object_size_mb": object_size_bytes / (1024 * 1024),
        "estimated_serialized_mb": estimated_serialized_bytes / (1024 * 1024),
        "item_count": item_count,
        "estimated_item_size_bytes": estimated_item_size,
        "recommended_chunk_size": recommended_chunk_size,
        "recommended_chunks": max(1, item_count // recommended_chunk_size),
    }