Skip to content

Commit 658da94

Browse files
committed
Remove chunk entity in DAG
1 parent 37c08a6 commit 658da94

File tree

37 files changed

+176
-327
lines changed

37 files changed

+176
-327
lines changed

mars/core/__init__.py

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,20 +20,15 @@
2020
EntityData,
2121
ENTITY_TYPE,
2222
Chunk,
23-
ChunkData,
24-
CHUNK_TYPE,
2523
Tileable,
2624
TileableData,
2725
TILEABLE_TYPE,
2826
Object,
2927
ObjectData,
3028
ObjectChunk,
31-
ObjectChunkData,
3229
OBJECT_TYPE,
3330
OBJECT_CHUNK_TYPE,
3431
FuseChunk,
35-
FuseChunkData,
36-
FUSE_CHUNK_TYPE,
3732
OutputType,
3833
register_output_types,
3934
get_output_types,

mars/core/entity/__init__.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,12 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15-
from .chunks import Chunk, ChunkData, CHUNK_TYPE
15+
from .chunks import Chunk
1616
from .core import Entity, EntityData, ENTITY_TYPE
1717
from .executable import ExecutableTuple, _ExecuteAndFetchMixin
18-
from .fuse import FuseChunk, FuseChunkData, FUSE_CHUNK_TYPE
18+
from .fuse import FuseChunk
1919
from .objects import (
2020
ObjectChunk,
21-
ObjectChunkData,
2221
Object,
2322
ObjectData,
2423
OBJECT_CHUNK_TYPE,

mars/core/entity/chunks.py

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,10 @@
1414

1515
from ...serialization.serializables import BoolField, FieldTypes, TupleField
1616
from ...utils import tokenize
17-
from .core import EntityData, Entity
17+
from .core import EntityData
1818

1919

20-
class ChunkData(EntityData):
20+
class Chunk(EntityData):
2121
__slots__ = ()
2222

2323
is_broadcaster = BoolField("is_broadcaster", default=False)
@@ -56,13 +56,3 @@ def _update_key(self):
5656
*(getattr(self, k, None) for k in self._keys_ if k != "_index"),
5757
),
5858
)
59-
60-
61-
class Chunk(Entity):
62-
_allow_data_type_ = (ChunkData,)
63-
64-
def __repr__(self):
65-
return f"{type(self).__name__}({self._data.__repr__()})"
66-
67-
68-
CHUNK_TYPE = (Chunk, ChunkData)

mars/core/entity/fuse.py

Lines changed: 3 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,13 @@
1515
import numpy as np
1616

1717
from ...serialization.serializables import ReferenceField
18-
from .chunks import ChunkData, Chunk, CHUNK_TYPE
18+
from .chunks import Chunk
1919

2020

21-
class FuseChunkData(ChunkData):
21+
class FuseChunk(Chunk):
2222
__slots__ = ("_inited",)
2323

24-
_chunk = ReferenceField(
25-
"chunk", CHUNK_TYPE, on_serialize=lambda x: x.data if hasattr(x, "data") else x
26-
)
24+
_chunk = ReferenceField("chunk", Chunk)
2725

2826
def __init__(self, *args, **kwargs):
2927
self._inited = False
@@ -63,11 +61,3 @@ def __setattr__(self, attr, value):
6361
@property
6462
def nbytes(self):
6563
return np.prod(self.shape) * self.dtype.itemsize
66-
67-
68-
class FuseChunk(Chunk):
69-
__slots__ = ()
70-
_allow_data_type_ = (FuseChunkData,)
71-
72-
73-
FUSE_CHUNK_TYPE = (FuseChunkData, FuseChunk)

mars/core/entity/objects.py

Lines changed: 4 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,13 @@
1515
from typing import Any, Dict
1616

1717
from ...serialization.serializables import FieldTypes, ListField
18-
from .chunks import ChunkData, Chunk
18+
from .chunks import Chunk
1919
from .core import Entity
2020
from .executable import _ToObjectMixin
2121
from .tileables import TileableData
2222

2323

24-
class ObjectChunkData(ChunkData):
24+
class ObjectChunk(Chunk):
2525
# chunk whose data could be any serializable
2626
__slots__ = ()
2727
type_name = "Object"
@@ -48,23 +48,12 @@ def get_params_from_data(cls, data: Any) -> Dict[str, Any]:
4848
return dict()
4949

5050

51-
class ObjectChunk(Chunk):
52-
__slots__ = ()
53-
_allow_data_type_ = (ObjectChunkData,)
54-
type_name = "Object"
55-
56-
5751
class ObjectData(TileableData, _ToObjectMixin):
5852
__slots__ = ()
5953
type_name = "Object"
6054

6155
# optional fields
62-
_chunks = ListField(
63-
"chunks",
64-
FieldTypes.reference(ObjectChunkData),
65-
on_serialize=lambda x: [it.data for it in x] if x is not None else x,
66-
on_deserialize=lambda x: [ObjectChunk(it) for it in x] if x is not None else x,
67-
)
56+
_chunks = ListField("chunks", FieldTypes.reference(ObjectChunk))
6857

6958
def __init__(self, op=None, nsplits=None, chunks=None, **kw):
7059
super().__init__(_op=op, _nsplits=nsplits, _chunks=chunks, **kw)
@@ -96,4 +85,4 @@ class Object(Entity, _ToObjectMixin):
9685

9786

9887
OBJECT_TYPE = (Object, ObjectData)
99-
OBJECT_CHUNK_TYPE = (ObjectChunk, ObjectChunkData)
88+
OBJECT_CHUNK_TYPE = (ObjectChunk,)

mars/core/entity/output_types.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
import functools
1616
from enum import Enum
1717

18-
from .fuse import FUSE_CHUNK_TYPE
18+
from .fuse import FuseChunk
1919
from .objects import OBJECT_TYPE, OBJECT_CHUNK_TYPE
2020

2121

@@ -83,7 +83,7 @@ def get_output_types(*objs, unknown_as=None):
8383
for obj in objs:
8484
if obj is None:
8585
continue
86-
elif isinstance(obj, FUSE_CHUNK_TYPE):
86+
elif isinstance(obj, FuseChunk):
8787
obj = obj.chunk
8888

8989
try:

mars/core/graph/builder/chunk.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
Union,
2727
)
2828

29-
from ....core import FUSE_CHUNK_TYPE, CHUNK_TYPE, TILEABLE_TYPE
29+
from ....core import FuseChunk, TILEABLE_TYPE, Chunk
3030
from ....typing import EntityType, TileableType, ChunkType
3131
from ....utils import copy_tileables, build_fetch
3232
from ...entity.tileables import handler
@@ -223,7 +223,7 @@ def _tile(
223223
chunks = []
224224
if need_process is not None:
225225
for t in need_process:
226-
if isinstance(t, CHUNK_TYPE):
226+
if isinstance(t, Chunk):
227227
chunks.append(self._get_data(t))
228228
elif isinstance(t, TILEABLE_TYPE):
229229
to_update_tileables.append(self._get_data(t))
@@ -304,7 +304,7 @@ def _iter(self):
304304
# so that fetch chunk can be generated.
305305
# Use chunk key as the key to make sure the copied chunk can be build to a fetch.
306306
processed_chunks = (
307-
c.chunk.key if isinstance(c, FUSE_CHUNK_TYPE) else c.key
307+
c.chunk.key if isinstance(c, FuseChunk) else c.key
308308
for c in chunk_graph.result_chunks
309309
)
310310
self._processed_chunks.update(processed_chunks)
@@ -406,7 +406,7 @@ def _process_node(self, entity: EntityType):
406406
if entity.key in self._processed_chunks:
407407
if entity not in self._chunk_to_fetch:
408408
# gen fetch
409-
fetch_chunk = build_fetch(entity).data
409+
fetch_chunk = build_fetch(entity)
410410
self._chunk_to_fetch[entity] = fetch_chunk
411411
return self._chunk_to_fetch[entity]
412412
return entity
@@ -417,7 +417,7 @@ def _select_inputs(self, inputs: List[ChunkType]):
417417
if inp.key in self._processed_chunks:
418418
# gen fetch
419419
if inp not in self._chunk_to_fetch:
420-
fetch_chunk = build_fetch(inp).data
420+
fetch_chunk = build_fetch(inp)
421421
self._chunk_to_fetch[inp] = fetch_chunk
422422
new_inputs.append(self._chunk_to_fetch[inp])
423423
else:

mars/core/operand/core.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,9 @@
2828
from ..context import Context
2929
from ..mode import is_eager_mode
3030
from ..entity import (
31-
OutputType,
31+
Chunk,
3232
ExecutableTuple,
33+
OutputType,
3334
get_chunk_types,
3435
get_tileable_types,
3536
get_output_types,
@@ -78,14 +79,14 @@ def _check_if_gpu(cls, inputs: List[TileableType]):
7879
def _tokenize_output(self, output_idx: int, **kw):
7980
return f"{self._key}_{output_idx}"
8081

81-
def _create_chunk(self, output_idx: int, index: Tuple[int], **kw) -> ChunkType:
82+
def _create_chunk(self, output_idx: int, index: Tuple[int], **kw) -> Chunk:
8283
output_type = kw.pop("output_type", None) or self._get_output_type(output_idx)
8384
if not output_type:
8485
raise ValueError("output_type should be specified")
8586

8687
if isinstance(output_type, (list, tuple)):
8788
output_type = output_type[output_idx]
88-
chunk_type, chunk_data_type = get_chunk_types(output_type)
89+
(chunk_data_type,) = get_chunk_types(output_type)
8990
kw["_i"] = output_idx
9091
kw["op"] = self
9192
kw["index"] = index
@@ -97,8 +98,7 @@ def _create_chunk(self, output_idx: int, index: Tuple[int], **kw) -> ChunkType:
9798
if "_key" not in kw:
9899
kw["_key"] = self._tokenize_output(output_idx, **kw)
99100

100-
data = chunk_data_type(**kw)
101-
return chunk_type(data)
101+
return chunk_data_type(**kw)
102102

103103
def _new_chunks(
104104
self, inputs: List[ChunkType], kws: List[Dict] = None, **kw
@@ -130,7 +130,7 @@ def _new_chunks(
130130
# for each output chunk, hold the reference to the other outputs
131131
# so that either no one or everyone are gc collected
132132
for j, t in enumerate(chunks):
133-
t.data._siblings = [c.data for c in chunks[:j] + chunks[j + 1 :]]
133+
t._siblings = [c for c in chunks[:j] + chunks[j + 1 :]]
134134
return chunks
135135

136136
def new_chunks(

mars/core/operand/fuse.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414

1515
from ... import opcodes
1616
from ...serialization.serializables import ReferenceField
17-
from ..entity import FuseChunk, FuseChunkData, NotSupportTile
17+
from ..entity import FuseChunk, NotSupportTile
1818
from ..graph import ChunkGraph
1919
from .base import Operand
2020

@@ -30,8 +30,7 @@ class FuseChunkMixin:
3030
__slots__ = ()
3131

3232
def _create_chunk(self, output_idx, index, **kw):
33-
data = FuseChunkData(_index=index, _op=self, **kw)
34-
return FuseChunk(data)
33+
return FuseChunk(_index=index, _op=self, **kw)
3534

3635
@classmethod
3736
def tile(cls, op):

mars/dataframe/arithmetic/core.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,9 @@
1919
import numpy as np
2020
import pandas as pd
2121

22-
from ...core import ENTITY_TYPE, CHUNK_TYPE, recursive_tile
22+
from ...core import ENTITY_TYPE, recursive_tile
2323
from ...serialization.serializables import AnyField
24-
from ...tensor.core import TENSOR_TYPE, TENSOR_CHUNK_TYPE, ChunkData, Chunk
24+
from ...tensor.core import TENSOR_TYPE, TENSOR_CHUNK_TYPE, Chunk
2525
from ...utils import classproperty, get_dtype
2626
from ..align import (
2727
align_series_series,
@@ -421,7 +421,7 @@ def _operator(self):
421421

422422
@classmethod
423423
def _calc_properties(cls, x1, x2=None, axis="columns"):
424-
is_chunk = isinstance(x1, CHUNK_TYPE)
424+
is_chunk = isinstance(x1, Chunk)
425425

426426
if isinstance(x1, (DATAFRAME_TYPE, DATAFRAME_CHUNK_TYPE)) and (
427427
x2 is None
@@ -625,7 +625,7 @@ def _new_chunks(self, inputs, kws=None, **kw):
625625
property_inputs = reversed(property_inputs)
626626
properties = self._calc_properties(*property_inputs)
627627

628-
inputs = [inp for inp in inputs if isinstance(inp, (Chunk, ChunkData))]
628+
inputs = [inp for inp in inputs if isinstance(inp, Chunk)]
629629

630630
shape = properties.pop("shape")
631631
if "shape" in kw:

0 commit comments

Comments
 (0)