Skip to content

Util

read_tf_record

read_tf_record(
    path: Path,
    *,
    compressed: bool = True,
    process: Optional[
        Callable[[Example], Optional[Example]]
    ] = None,
    verbose: bool = False,
    threads: int = 20
) -> Data

Reads a TensorFlow Records data and return a dict of numpy arrays.

This method can be used to read TFRecord before applying YDF on it.

Warning: Reading examples in python is very slow. Consider providing paths to YDF directly (e.g. model.predict("record:" + path)) instead (~20x faster).

Usage example:

import ydf

# Load a dataset
ds = ydf.util.read_tf_record(path="/path/to/tfrecord")

# Apply some pre-processing
ds["my_label"] = np.log(ds["my_label"])

# Train a model
ydf.RandomForestLearner(label="my_label",
  task=ydf.Task.REGRESSION).train(ds)

This method requires for all the TF Examples to have the same features and for all the features to have the same type and number of values. If your TF Record encode missing values by skipping features, you can use the process argument to add missing values manually:

import math

def process(example: tf.train.Example):
  # Add missing values for categorical features.
  for key in ["feature_1", "feature_2]:
    if key not in example.features.feature:
      example.features.feature[key].bytes_list.value.append(b"")

  # Add missing values for numerical features.
  for key in ["feature_3", "feature_4]:
    if key not in example.features.feature:
      example.features.feature[key].float_list.value.append(math.nan)

  return example

read_ds = tf_example.read_tf_recordio(path, process=process)

Parameters:

Name Type Description Default
path Path

Path or list of paths to TFRecord files. Supports sharded paths.

required
compressed bool

Whether the TFRecord is compressed.

True
process Optional[Callable[[Example], Optional[Example]]]

Optional function to process each TF Example. Can be used to filter out some features or to fix some example values (e.g., to ensure all the example have consistent feature values).

None
verbose bool

If True, print status of the dataset reading.

False
threads int

Number of reading threads.

20

Returns:

Type Description
Data

A dict of numpy arrays.

write_tf_record

write_tf_record(
    data: Data,
    *,
    path: Path,
    compressed: bool = True,
    process: Optional[Callable[[Example], Example]] = None,
    verbose: bool = False,
    threads: int = 20
) -> None

Writes a dict of numpy arrays into a TensorFlow Record.

This method can be used to prepare TFRecord for distributed training.

Usage example:

import ydf
import numpy as np

# Generate a dataset
dataset = {
    "f1": np.array([1, 2, 3]),
    "f2": np.array([1.1, 2.2, 3.3])}

# Write the dataset
ydf.util.write_tf_recordio(dataset, path="/path/to/tfrecord")

Parameters:

Name Type Description Default
data Data

A dict of numpy arrays. Support sharded paths.

required
path Path

Path or list of paths to TFRecord files.

required
compressed bool

Whether the TFRecord is compressed.

True
process Optional[Callable[[Example], Example]]

Optional function to process each TF Example.

None
verbose bool

If True, print status of the dataset writing.

False
threads int

Number of writing threads.

20

get_vertex_ai_cluster_spec

get_vertex_ai_cluster_spec(
    cluster_spec: Union[str, None] = None,
) -> VertexAIClusterSpec

Parses the Vertex AI cluster specification.

The cluster specification is a JSON string describing the nodes of the cluster. This specification is typically provided by Vertex AI as an environment variable.

Usage example:

In a train.py file runing in a Docker in Vertex AI.

# Gather the manager and workers configuration.
cluster_config = get_vertex_ai_cluster_spec()
print("cluster_config:", cluster_config)

if cluster_config.is_worker:
  # This machine is running a worker.
  ydf.start_worker(cluster_config.port)
  return

print("Train model with distribution")
learner = ydf.DistributedGradientBoostedTreesLearner(
    label=...,
    working_dir=...,
    workers=cluster_config.workers,
    resume_training=True,
)
model = learner.train(args.train_ds)

Parameters:

Name Type Description Default
cluster_spec Union[str, None]

Cluster specification as a JSON string. If None, the CLUSTER_SPEC environment variable is used instead.

None

Returns:

Type Description
VertexAIClusterSpec

The parsed cluster specification.

VertexAIClusterSpec dataclass

VertexAIClusterSpec(
    workers: List[str], is_worker: bool, port: int
)

Description of the cluster for Vertex AI.

Attributes:

Name Type Description
workers List[str]

List of IP addresses of the worker nodes.

is_worker bool

True if the current node is a worker node. False if it is the manager.

port int

Port number to use for communication between nodes.

is_worker instance-attribute

is_worker: bool

port instance-attribute

port: int

workers instance-attribute

workers: List[str]

Google Internal

  • ydf.util.read_tf_recordio
  • ydf.util.write_tf_recordio
  • ydf.util.read_f1_query