Source code for ray.data._internal.execution.interfaces.execution_options

import os
from dataclasses import dataclass, field
from typing import List, Optional, Union

from .common import NodeIdStr
from ray.data._internal.execution.util import memory_string
from ray.util.annotations import DeveloperAPI


[docs]@dataclass class ExecutionResources: """Specifies resources usage or resource limits for execution. The value `None` represents unknown resource usage or an unspecified limit. """ # CPU usage in cores (Ray logical CPU slots). cpu: Optional[float] = None # GPU usage in devices (Ray logical GPU slots). gpu: Optional[float] = None # Object store memory usage in bytes. object_store_memory: Optional[int] = None
[docs] def object_store_memory_str(self) -> str: """Returns a human-readable string for the object store memory field.""" if self.object_store_memory is None: return "None" else: return memory_string(self.object_store_memory)
[docs] def add(self, other: "ExecutionResources") -> "ExecutionResources": """Adds execution resources. Returns: A new ExecutionResource object with summed resources. """ total = ExecutionResources() if self.cpu is not None or other.cpu is not None: total.cpu = (self.cpu or 0.0) + (other.cpu or 0.0) if self.gpu is not None or other.gpu is not None: total.gpu = (self.gpu or 0.0) + (other.gpu or 0.0) if ( self.object_store_memory is not None or other.object_store_memory is not None ): total.object_store_memory = (self.object_store_memory or 0.0) + ( other.object_store_memory or 0.0 ) return total
[docs] def satisfies_limit(self, limit: "ExecutionResources") -> bool: """Return if this resource struct meets the specified limits. Note that None for a field means no limit. """ if self.cpu is not None and limit.cpu is not None and self.cpu > limit.cpu: return False if self.gpu is not None and limit.gpu is not None and self.gpu > limit.gpu: return False if ( self.object_store_memory is not None and limit.object_store_memory is not None and self.object_store_memory > limit.object_store_memory ): return False return True
[docs] def scale(self, f: float) -> "ExecutionResources": """Return copy with all set values scaled by `f`.""" return ExecutionResources( cpu=self.cpu * f if self.cpu is not None else None, gpu=self.gpu * f if self.gpu is not None else None, object_store_memory=self.object_store_memory * f if self.object_store_memory is not None else None, )
[docs]@DeveloperAPI @dataclass class ExecutionOptions: """Common options for execution. Some options may not be supported on all executors (e.g., resource limits). Attributes: resource_limits: Set a soft limit on the resource usage during execution. This is not supported in bulk execution mode. Autodetected by default. locality_with_output: Set this to prefer running tasks on the same node as the output node (node driving the execution). It can also be set to a list of node ids to spread the outputs across those nodes. Off by default. preserve_order: Set this to preserve the ordering between blocks processed by operators under the streaming executor. The bulk executor always preserves order. Off by default. actor_locality_enabled: Whether to enable locality-aware task dispatch to actors (on by default). This applies to both ActorPoolStrategy map and streaming_split operations. verbose_progress: Whether to report progress individually per operator. By default, only AllToAll operators and global progress is reported. This option is useful for performance debugging. Off by default. """ resource_limits: ExecutionResources = field(default_factory=ExecutionResources) locality_with_output: Union[bool, List[NodeIdStr]] = False preserve_order: bool = False actor_locality_enabled: bool = True verbose_progress: bool = bool(int(os.environ.get("RAY_DATA_VERBOSE_PROGRESS", "0")))