Skip to content

yggdrasil.arrow.cast

cast

Arrow conversion entry points with native fast-path preference.

Design principles

  1. Cast in the source engine, then serialize. Polars/Spark/pandas each have engine-native cast machinery exposed on :class:CastOptions as :meth:cast_polars / :meth:cast_spark / :meth:cast_pandas. We use those before the Arrow conversion — it's faster (no round-trip rebuild), preserves engine-specific dtypes (polars Categoricals, pandas ExtensionArrays), and lets lazy engines push the cast into their query plan. Arrow-side casting is reserved for sources that are already Arrow or have no native cast path.

  2. Bulk over iterate. Vectorized native methods over per-row iteration. The streaming entry point is reserved for sources that are themselves streams, or for materialized tables that need chunking via options.row_size / options.byte_size.

  3. One streaming pipeline. Per-batch Arrow cast and byte_size / row_size rechunking are owned by the nested struct helpers in :mod:yggdrasil.data.types.nested.struct_arrow (reachable via :meth:CastOptions.cast_arrow_batch_iterator). Every streaming entry point here flattens its input to pa.RecordBatch and hands it off to that pipeline — no parallel chunkers in this module.

  4. Bind source schemas, don't peek. When we infer a source schema, we bind it onto options.source_field so it propagates downstream and drives :meth:CastOptions.need_cast without re-inference.

  5. Emit the merged schema. When both source and target are bound, the output schema is :attr:CastOptions.merged_schema — reconciled per schema_mode. RecordBatchReader / iterator declarations use this.

  6. Honor every options knob. column_names projects on the way in; arrow_memory_pool threads through pyarrow allocators; safe flows into engine cast methods; row_size / byte_size drive output chunking via the nested rechunker.

default_arrow_scalar

default_arrow_scalar(
    dtype: Union[DataType, ListType, MapType, StructType, FixedSizeListType],
    nullable: bool,
)

Return a default scalar for a given Arrow type.

Parameters:

Name Type Description Default
dtype Union[DataType, ListType, MapType, StructType, FixedSizeListType]

Arrow data type.

required
nullable bool

Whether the scalar should be nullable.

required

Returns:

Type Description

Arrow scalar default.

get_arrow_nbytes

get_arrow_nbytes(obj: Any, default: int = 0) -> int

Best-effort byte size of an Arrow object.

obj.nbytes is the fast path but raises NotImplementedError on arrays containing variadic-buffer types (string_view, binary_view, list-view variants) in current pyarrow, because there's no single accumulator for the variadic data buffers. We fall back through:

  1. Container recursion — Table / RecordBatch over columns, ChunkedArray over chunks.
  2. Direct buffer walk — sum buf.size for every non-null buffer returned by Array.buffers(). For view types this naturally picks up validity + views + variadic data buffers.
  3. default — last resort, never raises.

Always returns an int; never propagates exceptions from Arrow internals (the caller is sizing batches for chunking, not doing accounting — a slightly off estimate is fine, a crash is not).

rechunk_arrow_batches_by_byte_size

rechunk_arrow_batches_by_byte_size(
    batches: Iterable[RecordBatch],
    *,
    byte_size: int | None = None,
    row_size: int | None = None,
    memory_pool: MemoryPool | None = None
) -> Iterator[pa.RecordBatch]

Stream-coalesce/slice batches to ~byte_size bytes / row_size rows.

Both knobs are optional:

  • Neither set → passthrough.
  • row_size only → emit fixed-size chunks of at most row_size rows; no buffering, zero-copy slices.
  • byte_size only → emit ~byte_size-byte chunks using the per-segment bytes/row ratio to derive a row target.
  • Both set → byte_size drives the row target; row_size caps it (final target_rows = min(row_size, derived)).

Byte sizing routes through :func:get_arrow_nbytes so view-typed arrays (string_view / binary_view) — which raise from RecordBatch.nbytes in current pyarrow — fall back to a buffer walk instead of crashing the rechunker.

Algorithm (byte_size path):

  • Empty incoming batch → drop (no schema gymnastics on zero-row flushes — the consumer already saw a schema in an earlier batch or will get one from the upstream reader).
  • Buffer empty + incoming batch already at/over target → slice it directly into target-sized chunks (zero-copy).
  • Otherwise accumulate; once buffered nbytes crosses the target, concat + slice the buffer to target-sized chunks. Yield everything except a possibly-undersized tail; carry the tail forward.
  • On exhaustion → flush whatever is left as a single concat'd batch (may be smaller than byte_size).

any_to_arrow_table

any_to_arrow_table(obj: Any, options: Optional[CastOptions] = None) -> pa.Table

Convert any supported object to a pa.Table, with engine-native casting applied upstream when possible.

Casting strategy:

  • Arrow inputs (Table/RecordBatch/Array/Reader) — cast on the Arrow side via :meth:CastOptions.cast_arrow_tabular, with the need_cast skip optimization.
  • Pandas / Spark / Polars inputs — cast in-engine first via cast_pandas / cast_spark / cast_polars, then serialize to Arrow. The serialized table needs no further cast.
  • Generic Python inputs — wrapped via pl.DataFrame(obj) to get a polars cast path.

any_to_arrow_record_batch

any_to_arrow_record_batch(
    obj: Any, options: Optional[CastOptions] = None
) -> pa.RecordBatch

Convert to a single pa.RecordBatch.

any_to_arrow_batch_iterator

any_to_arrow_batch_iterator(
    obj: Any, options: Optional[CastOptions] = None
) -> Iterator[pa.RecordBatch]

Convert any supported object to a lazy iterator of pa.RecordBatch.

Per-batch Arrow cast and byte_size / row_size rechunking are owned by :meth:CastOptions.cast_arrow_batch_iterator, which delegates to the nested struct rechunker. The job here is to produce the source batch stream — engine-native casts happen upstream when an engine (polars / spark / pandas) owns the data.

For Polars LazyFrame and Spark DataFrame, the engine-native cast is applied before serialization, so the rechunker sees already- cast batches and skips per-batch Arrow rework.

any_to_arrow_record_batch_reader

any_to_arrow_record_batch_reader(
    obj: Any, options: Optional[CastOptions] = None
) -> pa.RecordBatchReader

Wrap any_to_arrow_batch_iterator behind a RecordBatchReader.

Output schema: merged_schematarget_schema → first-batch peek.

any_to_arrow_scalar

any_to_arrow_scalar(
    scalar: Any, options: Optional[CastOptions] = None
) -> pa.Scalar

Convert a Python value to an Arrow scalar, then cast to target type.

cast_arrow_scalar

cast_arrow_scalar(
    scalar: Scalar, options: Optional[CastOptions] = None
) -> pa.Scalar

Cast an Arrow scalar via the array path.

cast_arrow_array

cast_arrow_array(
    array: Union[ChunkedArray, Array], options: Optional[CastOptions] = None
) -> Union[pa.ChunkedArray, pa.Array]

Cast a pyarrow Array/ChunkedArray.

cast_arrow_tabular

cast_arrow_tabular(
    data: Union[Table, RecordBatch], options: Optional[CastOptions] = None
) -> Union[pa.Table, pa.RecordBatch]

Cast pyarrow Table/RecordBatch with skip-cast on schema match.

cast_arrow_record_batch_reader

cast_arrow_record_batch_reader(
    data: RecordBatchReader, options: Optional[CastOptions] = None
) -> pa.RecordBatchReader

Lazily wrap a RecordBatchReader with on-the-fly cast.

Pure passthrough when the source schema matches target and no chunking is requested.