yggdrasil.arrow.cast¶
cast ¶
Arrow conversion entry points with native fast-path preference.
Design principles¶
-
Cast in the source engine, then serialize. Polars/Spark/pandas each have engine-native cast machinery exposed on :class:
CastOptionsas :meth:cast_polars/ :meth:cast_spark/ :meth:cast_pandas. We use those before the Arrow conversion — it's faster (no round-trip rebuild), preserves engine-specific dtypes (polars Categoricals, pandas ExtensionArrays), and lets lazy engines push the cast into their query plan. Arrow-side casting is reserved for sources that are already Arrow or have no native cast path. -
Bulk over iterate. Vectorized native methods over per-row iteration. The streaming entry point is reserved for sources that are themselves streams, or for materialized tables that need chunking via
options.row_size/options.byte_size. -
One streaming pipeline. Per-batch Arrow cast and
byte_size/row_sizerechunking are owned by the nested struct helpers in :mod:yggdrasil.data.types.nested.struct_arrow(reachable via :meth:CastOptions.cast_arrow_batch_iterator). Every streaming entry point here flattens its input topa.RecordBatchand hands it off to that pipeline — no parallel chunkers in this module. -
Bind source schemas, don't peek. When we infer a source schema, we bind it onto
options.source_fieldso it propagates downstream and drives :meth:CastOptions.need_castwithout re-inference. -
Emit the merged schema. When both source and target are bound, the output schema is :attr:
CastOptions.merged_schema— reconciled perschema_mode.RecordBatchReader/ iterator declarations use this. -
Honor every options knob.
column_namesprojects on the way in;arrow_memory_poolthreads through pyarrow allocators;safeflows into engine cast methods;row_size/byte_sizedrive output chunking via the nested rechunker.
default_arrow_scalar ¶
default_arrow_scalar(
dtype: Union[DataType, ListType, MapType, StructType, FixedSizeListType],
nullable: bool,
)
Return a default scalar for a given Arrow type.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
dtype
|
Union[DataType, ListType, MapType, StructType, FixedSizeListType]
|
Arrow data type. |
required |
nullable
|
bool
|
Whether the scalar should be nullable. |
required |
Returns:
| Type | Description |
|---|---|
|
Arrow scalar default. |
get_arrow_nbytes ¶
Best-effort byte size of an Arrow object.
obj.nbytes is the fast path but raises NotImplementedError
on arrays containing variadic-buffer types (string_view,
binary_view, list-view variants) in current pyarrow, because
there's no single accumulator for the variadic data buffers. We
fall back through:
- Container recursion —
Table/RecordBatchover columns,ChunkedArrayover chunks. - Direct buffer walk — sum
buf.sizefor every non-null buffer returned byArray.buffers(). For view types this naturally picks up validity + views + variadic data buffers. default— last resort, never raises.
Always returns an int; never propagates exceptions from Arrow
internals (the caller is sizing batches for chunking, not doing
accounting — a slightly off estimate is fine, a crash is not).
rechunk_arrow_batches_by_byte_size ¶
rechunk_arrow_batches_by_byte_size(
batches: Iterable[RecordBatch],
*,
byte_size: int | None = None,
row_size: int | None = None,
memory_pool: MemoryPool | None = None
) -> Iterator[pa.RecordBatch]
Stream-coalesce/slice batches to ~byte_size bytes / row_size rows.
Both knobs are optional:
- Neither set → passthrough.
row_sizeonly → emit fixed-size chunks of at mostrow_sizerows; no buffering, zero-copy slices.byte_sizeonly → emit ~byte_size-byte chunks using the per-segment bytes/row ratio to derive a row target.- Both set →
byte_sizedrives the row target;row_sizecaps it (finaltarget_rows = min(row_size, derived)).
Byte sizing routes through :func:get_arrow_nbytes so view-typed
arrays (string_view / binary_view) — which raise from
RecordBatch.nbytes in current pyarrow — fall back to a buffer
walk instead of crashing the rechunker.
Algorithm (byte_size path):
- Empty incoming batch → drop (no schema gymnastics on zero-row flushes — the consumer already saw a schema in an earlier batch or will get one from the upstream reader).
- Buffer empty + incoming batch already at/over target → slice it directly into target-sized chunks (zero-copy).
- Otherwise accumulate; once buffered
nbytescrosses the target, concat + slice the buffer to target-sized chunks. Yield everything except a possibly-undersized tail; carry the tail forward. - On exhaustion → flush whatever is left as a single concat'd
batch (may be smaller than
byte_size).
any_to_arrow_table ¶
Convert any supported object to a pa.Table, with engine-native
casting applied upstream when possible.
Casting strategy:
- Arrow inputs (Table/RecordBatch/Array/Reader) — cast on the
Arrow side via :meth:
CastOptions.cast_arrow_tabular, with theneed_castskip optimization. - Pandas / Spark / Polars inputs — cast in-engine first via
cast_pandas/cast_spark/cast_polars, then serialize to Arrow. The serialized table needs no further cast. - Generic Python inputs — wrapped via
pl.DataFrame(obj)to get a polars cast path.
any_to_arrow_record_batch ¶
Convert to a single pa.RecordBatch.
any_to_arrow_batch_iterator ¶
any_to_arrow_batch_iterator(
obj: Any, options: Optional[CastOptions] = None
) -> Iterator[pa.RecordBatch]
Convert any supported object to a lazy iterator of pa.RecordBatch.
Per-batch Arrow cast and byte_size / row_size rechunking
are owned by :meth:CastOptions.cast_arrow_batch_iterator, which
delegates to the nested struct rechunker. The job here is to
produce the source batch stream — engine-native casts happen
upstream when an engine (polars / spark / pandas) owns the data.
For Polars LazyFrame and Spark DataFrame, the engine-native cast is applied before serialization, so the rechunker sees already- cast batches and skips per-batch Arrow rework.
any_to_arrow_record_batch_reader ¶
any_to_arrow_record_batch_reader(
obj: Any, options: Optional[CastOptions] = None
) -> pa.RecordBatchReader
Wrap any_to_arrow_batch_iterator behind a RecordBatchReader.
Output schema: merged_schema → target_schema →
first-batch peek.
any_to_arrow_scalar ¶
Convert a Python value to an Arrow scalar, then cast to target type.
cast_arrow_scalar ¶
Cast an Arrow scalar via the array path.
cast_arrow_array ¶
cast_arrow_array(
array: Union[ChunkedArray, Array], options: Optional[CastOptions] = None
) -> Union[pa.ChunkedArray, pa.Array]
Cast a pyarrow Array/ChunkedArray.
cast_arrow_tabular ¶
cast_arrow_tabular(
data: Union[Table, RecordBatch], options: Optional[CastOptions] = None
) -> Union[pa.Table, pa.RecordBatch]
Cast pyarrow Table/RecordBatch with skip-cast on schema match.
cast_arrow_record_batch_reader ¶
cast_arrow_record_batch_reader(
data: RecordBatchReader, options: Optional[CastOptions] = None
) -> pa.RecordBatchReader
Lazily wrap a RecordBatchReader with on-the-fly cast.
Pure passthrough when the source schema matches target and no chunking is requested.