Training a model with distributed LightGBM
Training a model with distributed LightGBM#
In this example we will train a model in Ray AIR using distributed LightGBM.
Let’s start with installing our dependencies:
!pip install -qU "ray[tune]" lightgbm_ray
[notice] A new release of pip available: 22.3.1 -> 23.1.2
[notice] To update, run: pip install --upgrade pip
Then we need some imports:
from typing import Tuple
import ray
from ray.train.lightgbm import LightGBMPredictor
from ray.data.preprocessors.chain import Chain
from ray.data.preprocessors.encoder import Categorizer
from ray.train.lightgbm import LightGBMTrainer
from ray.train import Result, ScalingConfig
from ray.data import Dataset
from ray.data.preprocessors import StandardScaler
/Users/balaji/Documents/GitHub/ray/.venv/lib/python3.11/site-packages/tqdm/auto.py:21: TqdmWarning: IProgress not found. Please update jupyter and ipywidgets. See https://ipywidgets.readthedocs.io/en/stable/user_install.html
from .autonotebook import tqdm as notebook_tqdm
2023-07-07 14:34:14,951 INFO util.py:159 -- Missing packages: ['ipywidgets']. Run `pip install -U ipywidgets`, then restart the notebook server for rich notebook output.
2023-07-07 14:34:15,892 INFO util.py:159 -- Missing packages: ['ipywidgets']. Run `pip install -U ipywidgets`, then restart the notebook server for rich notebook output.
Next we define a function to load our train, validation, and test datasets.
def prepare_data() -> Tuple[Dataset, Dataset, Dataset]:
dataset = ray.data.read_csv("s3://ano[email protected]/breast_cancer_with_categorical.csv")
train_dataset, valid_dataset = dataset.train_test_split(test_size=0.3)
test_dataset = valid_dataset.drop_columns(cols=["target"])
return train_dataset, valid_dataset, test_dataset
The following function will create a LightGBM trainer, train it, and return the result.
def train_lightgbm(num_workers: int, use_gpu: bool = False) -> Result:
train_dataset, valid_dataset, _ = prepare_data()
# Scale some random columns, and categorify the categorical_column,
# allowing LightGBM to use its built-in categorical feature support
preprocessor = Chain(
Categorizer(["categorical_column"]),
StandardScaler(columns=["mean radius", "mean texture"])
)
# LightGBM specific params
params = {
"objective": "binary",
"metric": ["binary_logloss", "binary_error"],
}
trainer = LightGBMTrainer(
scaling_config=ScalingConfig(num_workers=num_workers, use_gpu=use_gpu),
label_column="target",
params=params,
datasets={"train": train_dataset, "valid": valid_dataset},
preprocessor=preprocessor,
num_boost_round=100,
)
result = trainer.fit()
print(result.metrics)
return result
Once we have the result, we can do batch inference on the obtained model. Let’s define a utility function for this.
import pandas as pd
from ray.train import Checkpoint
from ray.data import ActorPoolStrategy
class Predict:
def __init__(self, checkpoint: Checkpoint):
self.predictor = LightGBMPredictor.from_checkpoint(checkpoint)
def __call__(self, batch: pd.DataFrame) -> pd.DataFrame:
return self.predictor.predict(batch)
def predict_lightgbm(result: Result):
_, _, test_dataset = prepare_data()
scores = test_dataset.map_batches(
Predict,
fn_constructor_args=[result.checkpoint],
compute=ActorPoolStrategy(),
batch_format="pandas"
)
predicted_labels = scores.map_batches(lambda df: (df > 0.5).astype(int), batch_format="pandas")
print(f"PREDICTED LABELS")
predicted_labels.show()
Now we can run the training:
result = train_lightgbm(num_workers=2, use_gpu=False)
Tune Status
| Current time: | 2023-07-07 14:34:34 |
| Running for: | 00:00:06.06 |
| Memory: | 12.2/64.0 GiB |
System Info
Using FIFO scheduling algorithm.Logical resource usage: 4.0/10 CPUs, 0/0 GPUs
Trial Status
| Trial name | status | loc | iter | total time (s) | train-binary_logloss | train-binary_error | valid-binary_logloss |
|---|---|---|---|---|---|---|---|
| LightGBMTrainer_0c5ae_00000 | TERMINATED | 127.0.0.1:10027 | 101 | 4.5829 | 0.000202293 | 0 | 0.130232 |
(LightGBMTrainer pid=10027) The `preprocessor` arg to Trainer is deprecated. Apply preprocessor transformations ahead of time by calling `preprocessor.transform(ds)`. Support for the preprocessor arg will be dropped in a future release.
(LightGBMTrainer pid=10027) Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[MapBatches(get_pd_value_counts)]
(LightGBMTrainer pid=10027) Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
(LightGBMTrainer pid=10027) Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`
(LightGBMTrainer pid=10027) Tip: Use `take_batch()` instead of `take() / show()` to return records in pandas or numpy batch format.
(LightGBMTrainer pid=10027) Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[MapBatches(Categorizer._transform_pandas)] -> AllToAllOperator[Aggregate]
(LightGBMTrainer pid=10027) Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
(LightGBMTrainer pid=10027) Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`
(pid=10027) Running: 0.0/10.0 CPU, 0.0/0.0 GPU, 0.0 MiB/512.0 MiB object_store_memory: 0%| | 0/14 [00:00<?, ?it/s]
(LightGBMTrainer pid=10027) Warning: The Ray cluster currently does not have any available CPUs. The Dataset job will hang unless more CPUs are freed up. A common reason is that cluster resources are used by Actors or Tune trials; see the following link for more details: https://docs.ray.io/en/master/data/dataset-internals.html#datasets-and-tune
(pid=10027) Running: 0.0/10.0 CPU, 0.0/0.0 GPU, 0.0 MiB/512.0 MiB object_store_memory: 7%|▋ | 1/14 [00:00<00:01, 9.53it/s]
(LightGBMTrainer pid=10027) Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[MapBatches(Categorizer._transform_pandas)->MapBatches(StandardScaler._transform_pandas)]
(pid=10027) Running: 0.0/10.0 CPU, 0.0/0.0 GPU, 0.0 MiB/512.0 MiB object_store_memory: 7%|▋ | 1/14 [00:00<00:01, 7.59it/s]
(LightGBMTrainer pid=10027) Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
(pid=10027) Running: 0.0/10.0 CPU, 0.0/0.0 GPU, 0.0 MiB/512.0 MiB object_store_memory: 7%|▋ | 1/14 [00:00<00:01, 6.59it/s]
(LightGBMTrainer pid=10027) Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`
(LightGBMTrainer pid=10027) Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[MapBatches(Categorizer._transform_pandas)->MapBatches(StandardScaler._transform_pandas)]
(LightGBMTrainer pid=10027) Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
(LightGBMTrainer pid=10027) Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`
(_RemoteRayLightGBMActor pid=10063) [LightGBM] [Info] Trying to bind port 51134...
(_RemoteRayLightGBMActor pid=10063) [LightGBM] [Info] Binding port 51134 succeeded
(_RemoteRayLightGBMActor pid=10063) [LightGBM] [Info] Listening...
(_RemoteRayLightGBMActor pid=10062) [LightGBM] [Warning] Connecting to rank 1 failed, waiting for 200 milliseconds
(_RemoteRayLightGBMActor pid=10063) [LightGBM] [Info] Connected to rank 0
(_RemoteRayLightGBMActor pid=10063) [LightGBM] [Info] Local rank: 1, total number of machines: 2
(_RemoteRayLightGBMActor pid=10063) [LightGBM] [Warning] num_threads is set=2, n_jobs=-1 will be ignored. Current value: num_threads=2
(_RemoteRayLightGBMActor pid=10062) /Users/balaji/Documents/GitHub/ray/.venv/lib/python3.11/site-packages/lightgbm/basic.py:1780: UserWarning: Overriding the parameters from Reference Dataset.
(_RemoteRayLightGBMActor pid=10062) _log_warning('Overriding the parameters from Reference Dataset.')
(_RemoteRayLightGBMActor pid=10062) /Users/balaji/Documents/GitHub/ray/.venv/lib/python3.11/site-packages/lightgbm/basic.py:1513: UserWarning: categorical_column in param dict is overridden.
(_RemoteRayLightGBMActor pid=10062) _log_warning(f'{cat_alias} in param dict is overridden.')
2023-07-07 14:34:34,087 INFO tune.py:1148 -- Total run time: 7.18 seconds (6.05 seconds for the tuning loop).
{'train-binary_logloss': 0.00020229312743896637, 'train-binary_error': 0.0, 'valid-binary_logloss': 0.13023245107091222, 'valid-binary_error': 0.023529411764705882, 'time_this_iter_s': 0.021785974502563477, 'should_checkpoint': True, 'done': True, 'training_iteration': 101, 'trial_id': '0c5ae_00000', 'date': '2023-07-07_14-34-34', 'timestamp': 1688765674, 'time_total_s': 4.582904100418091, 'pid': 10027, 'hostname': 'Balajis-MacBook-Pro-16', 'node_ip': '127.0.0.1', 'config': {}, 'time_since_restore': 4.582904100418091, 'iterations_since_restore': 101, 'experiment_tag': '0'}
And perform inference on the obtained model:
predict_lightgbm(result)
2023-07-07 14:34:36,769 INFO read_api.py:374 -- To satisfy the requested parallelism of 20, each read task output will be split into 20 smaller blocks.
2023-07-07 14:34:38,655 WARNING plan.py:567 -- Warning: The Ray cluster currently does not have any available CPUs. The Dataset job will hang unless more CPUs are freed up. A common reason is that cluster resources are used by Actors or Tune trials; see the following link for more details: https://docs.ray.io/en/master/data/dataset-internals.html#datasets-and-tune
2023-07-07 14:34:38,668 INFO dataset.py:2180 -- Tip: Use `take_batch()` instead of `take() / show()` to return records in pandas or numpy batch format.
2023-07-07 14:34:38,674 INFO streaming_executor.py:92 -- Executing DAG InputDataBuffer[Input] -> ActorPoolMapOperator[MapBatches(<lambda>)->MapBatches(Predict)] -> TaskPoolMapOperator[MapBatches(<lambda>)]
2023-07-07 14:34:38,674 INFO streaming_executor.py:93 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
2023-07-07 14:34:38,676 INFO streaming_executor.py:95 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`
2023-07-07 14:34:38,701 INFO actor_pool_map_operator.py:117 -- MapBatches(<lambda>)->MapBatches(Predict): Waiting for 1 pool actors to start...
PREDICTED LABELS
{'predictions': 1}
{'predictions': 1}
{'predictions': 0}
{'predictions': 1}
{'predictions': 1}
{'predictions': 1}
{'predictions': 1}
{'predictions': 1}
{'predictions': 1}
{'predictions': 1}
{'predictions': 0}
{'predictions': 1}
{'predictions': 1}
{'predictions': 1}
{'predictions': 1}
{'predictions': 0}
{'predictions': 1}
{'predictions': 1}
{'predictions': 1}
{'predictions': 0}