Source code for ray.train.huggingface.transformers.transformers_predictor
import logging
from typing import TYPE_CHECKING, List, Optional, Type, Union
import pandas as pd
from ray.air.checkpoint import Checkpoint
from ray.air.constants import TENSOR_COLUMN_NAME
from ray.air.data_batch_type import DataBatchType
from ray.train.predictor import Predictor
from ray.util import log_once
from ray.util.annotations import PublicAPI
try:
import torch
torch_get_gpus = torch.cuda.device_count
except ImportError:
def torch_get_gpus():
return 0
try:
import tensorflow
def tf_get_gpus():
return len(tensorflow.config.list_physical_devices("GPU"))
except ImportError:
def tf_get_gpus():
return 0
TRANSFORMERS_IMPORT_ERROR: Optional[ImportError] = None
try:
from transformers.pipelines import Pipeline
from transformers.pipelines import pipeline as pipeline_factory
from transformers.pipelines.table_question_answering import (
TableQuestionAnsweringPipeline,
)
except ImportError as e:
TRANSFORMERS_IMPORT_ERROR = e
if TYPE_CHECKING:
from ray.data.preprocessor import Preprocessor
from transformers.modeling_utils import PreTrainedModel
from transformers.modeling_tf_utils import TFPreTrainedModel
logger = logging.getLogger(__name__)
[docs]@PublicAPI(stability="alpha")
class TransformersPredictor(Predictor):
"""A predictor for HuggingFace Transformers PyTorch models.
This predictor uses Transformers Pipelines for inference.
Args:
pipeline: The Transformers pipeline to use for inference.
preprocessor: A preprocessor used to transform data batches prior
to prediction.
use_gpu: If set, the model will be moved to GPU on instantiation and
prediction happens on GPU.
"""
def __init__(
self,
pipeline: Optional["Pipeline"] = None,
preprocessor: Optional["Preprocessor"] = None,
use_gpu: bool = False,
):
if TRANSFORMERS_IMPORT_ERROR is not None:
raise TRANSFORMERS_IMPORT_ERROR
self.pipeline = pipeline
self.use_gpu = use_gpu
num_gpus = max(torch_get_gpus(), tf_get_gpus())
if not use_gpu and num_gpus > 0 and log_once("hf_predictor_not_using_gpu"):
logger.warning(
"You have `use_gpu` as False but there are "
f"{num_gpus} GPUs detected on host where "
"prediction will only use CPU. Please consider explicitly "
"setting `TransformersPredictor(use_gpu=True)` or "
"`batch_predictor.predict(ds, num_gpus_per_worker=1)` to "
"enable GPU prediction. Ignore if you have set `device` or "
"`device_map` arguments in the `pipeline` manually."
)
super().__init__(preprocessor)
def __repr__(self):
return (
f"{self.__class__.__name__}(pipeline={self.pipeline!r}, "
f"preprocessor={self._preprocessor!r})"
)
[docs] @classmethod
def from_checkpoint(
cls,
checkpoint: Checkpoint,
*,
pipeline_cls: Optional[Type["Pipeline"]] = None,
model_cls: Optional[
Union[str, Type["PreTrainedModel"], Type["TFPreTrainedModel"]]
] = None,
pretrained_model_kwargs: Optional[dict] = None,
use_gpu: bool = False,
**pipeline_kwargs,
) -> "TransformersPredictor":
"""Instantiate the predictor from a Checkpoint.
The checkpoint is expected to be a result of ``TransformersTrainer``.
Note that the Transformers ``pipeline`` used internally expects to
receive raw text. If you have any Preprocessors in Checkpoint
that tokenize the data, remove them by calling
``Checkpoint.set_preprocessor(None)`` beforehand.
Args:
checkpoint: The checkpoint to load the model, tokenizer and
preprocessor from. It is expected to be from the result of a
``TransformersTrainer`` run.
pipeline_cls: A ``transformers.pipelines.Pipeline`` class to use.
If not specified, will use the ``pipeline`` abstraction
wrapper.
model_cls: A ``transformers.PreTrainedModel`` class to create from
the checkpoint.
pretrained_model_kwargs: If set and a ``model_cls`` is provided, will
be passed to ``TransformersCheckpoint.get_model()``.
use_gpu: If set, the model will be moved to GPU on instantiation and
prediction happens on GPU.
**pipeline_kwargs: Any kwargs to pass to the pipeline
initialization. If ``pipeline_cls`` is None, this must contain
the 'task' argument. Can be used
to override the tokenizer with 'tokenizer'. If ``use_gpu`` is
True, 'device' will be set to 0 by default, unless 'device_map' is
passed.
"""
from ray.train.huggingface import TransformersCheckpoint
if TRANSFORMERS_IMPORT_ERROR is not None:
raise TRANSFORMERS_IMPORT_ERROR
if not pipeline_cls and "task" not in pipeline_kwargs:
raise ValueError(
"If `pipeline_cls` is not specified, 'task' must be passed as a kwarg."
)
if use_gpu and "device_map" not in pipeline_kwargs:
# default to using the GPU with the first index
pipeline_kwargs.setdefault("device", 0)
model = None
if model_cls:
if not isinstance(checkpoint, TransformersCheckpoint):
raise ValueError(
"If `model_cls` is passed, the checkpoint has to be a "
"`TransformersCheckpoint`."
)
pretrained_model_kwargs = pretrained_model_kwargs or {}
model = checkpoint.get_model(model_cls, **pretrained_model_kwargs)
if pipeline_cls and model:
# Custom pipeline is passed and model was retrieved
pipeline = pipeline_cls(model, **pipeline_kwargs)
else:
# Custom pipeline class
if pipeline_cls:
pipeline_kwargs["pipeline_class"] = pipeline_cls
if not model:
# Infer model from checkpoint
with checkpoint.as_directory() as checkpoint_path:
# Tokenizer will be loaded automatically (no need to specify
# `tokenizer=checkpoint_path`)
pipeline = pipeline_factory(
model=checkpoint_path, **pipeline_kwargs
)
else:
# Use model with default pipeline
pipeline = pipeline_factory(model=model, **pipeline_kwargs)
preprocessor = checkpoint.get_preprocessor()
return cls(
pipeline=pipeline,
preprocessor=preprocessor,
use_gpu=use_gpu,
)
def _predict(
self, data: Union[list, pd.DataFrame], **pipeline_call_kwargs
) -> pd.DataFrame:
ret = self.pipeline(data, **pipeline_call_kwargs)
# Remove unnecessary lists
try:
new_ret = [x[0] if isinstance(x, list) and len(x) == 1 else x for x in ret]
df = pd.DataFrame(new_ret)
except Exception:
# if we fail for any reason, just give up
df = pd.DataFrame(ret)
df.columns = [str(col) for col in df.columns]
return df
@staticmethod
def _convert_data_for_pipeline(
data: pd.DataFrame, pipeline: "Pipeline"
) -> Union[list, pd.DataFrame]:
"""Convert the data into a format accepted by the pipeline.
In most cases, this format is a list of strings."""
# Special case where pd.DataFrame is allowed.
if isinstance(pipeline, TableQuestionAnsweringPipeline):
# TODO(team-ml): This may be a performance bottleneck.
return data
# Otherwise, a list of columns as lists.
columns = [data[col].to_list() for col in data.columns]
# Flatten if it's only one column.
while isinstance(columns, list) and len(columns) == 1:
columns = columns[0]
return columns
[docs] def predict(
self,
data: DataBatchType,
feature_columns: Optional[Union[List[str], List[int]]] = None,
**predict_kwargs,
) -> DataBatchType:
"""Run inference on data batch.
The data is converted into a list (unless ``pipeline`` is a
``TableQuestionAnsweringPipeline``) and passed to the ``pipeline``
object.
Args:
data: A batch of input data. Either a pandas DataFrame or numpy
array.
feature_columns: The names or indices of the columns in the
data to use as features to predict on. If None, use all
columns.
**pipeline_call_kwargs: additional kwargs to pass to the
``pipeline`` object.
Examples:
>>> import pandas as pd
>>> from transformers import AutoConfig, AutoModelForCausalLM, AutoTokenizer
>>> from transformers.pipelines import pipeline
>>> from ray.train.huggingface import TransformersPredictor
>>>
>>> model_checkpoint = "gpt2"
>>> tokenizer_checkpoint = "sgugger/gpt2-like-tokenizer"
>>> tokenizer = AutoTokenizer.from_pretrained(tokenizer_checkpoint)
>>>
>>> model_config = AutoConfig.from_pretrained(model_checkpoint)
>>> model = AutoModelForCausalLM.from_config(model_config)
>>> predictor = TransformersPredictor(
... pipeline=pipeline(
... task="text-generation", model=model, tokenizer=tokenizer
... )
... )
>>>
>>> prompts = pd.DataFrame(
... ["Complete me", "And me", "Please complete"], columns=["sentences"]
... )
>>> predictions = predictor.predict(prompts)
Returns:
Prediction result.
"""
return Predictor.predict(
self, data, feature_columns=feature_columns, **predict_kwargs
)
def _predict_pandas(
self,
data: "pd.DataFrame",
feature_columns: Optional[List[str]] = None,
**pipeline_call_kwargs,
) -> "pd.DataFrame":
if TENSOR_COLUMN_NAME in data:
arr = data[TENSOR_COLUMN_NAME].to_numpy()
if feature_columns:
data = pd.DataFrame(arr[:, feature_columns])
elif feature_columns:
data = data[feature_columns]
data = data[feature_columns] if feature_columns else data
data = self._convert_data_for_pipeline(data, self.pipeline)
return self._predict(data, **pipeline_call_kwargs)