Skip to content

Commit

Permalink
Unify data-ingest and parallel-workload data types
Browse files Browse the repository at this point in the history
  • Loading branch information
def- committed Oct 19, 2023
1 parent 5e8e272 commit 00b2e1a
Show file tree
Hide file tree
Showing 9 changed files with 287 additions and 292 deletions.
258 changes: 210 additions & 48 deletions misc/python/materialize/data_ingest/data_type.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,14 @@
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.

import json
import random
import string
from enum import Enum
from typing import Any

from pg8000.native import literal


class RecordSize(Enum):
TINY = 1
Expand All @@ -22,146 +25,305 @@ class RecordSize(Enum):

class Backend(Enum):
AVRO = 1
POSTGRES = 2
JSON = 2
POSTGRES = 3


class DataType:
"""As supported by Avro: https://avro.apache.org/docs/1.11.1/specification/_print/"""

@staticmethod
def random_value(record_size: RecordSize) -> Any:
def random_value(
rng: random.Random,
record_size: RecordSize = RecordSize.LARGE,
in_query: bool = False,
) -> Any:
"""Generate a random value, should be possible for all types."""
raise NotImplementedError

@staticmethod
def numeric_value(num: int) -> Any:
def numeric_value(num: int, in_query: bool = False) -> Any:
"""Generate a value that corresponds to `num`, so that it will always be the same value for the same input `num`, but fits into the type. This doesn't make sense for a type like boolean."""
raise NotImplementedError

@staticmethod
def name(backend: Backend) -> str:
def name(backend: Backend = Backend.POSTGRES) -> str:
raise NotImplementedError


class NullType(DataType):
class Boolean(DataType):
@staticmethod
def random_value(record_size: RecordSize) -> Any:
return None
def random_value(
rng: random.Random,
record_size: RecordSize = RecordSize.LARGE,
in_query: bool = False,
) -> Any:
return random.choice((True, False))

@staticmethod
def name(backend: Backend = Backend.POSTGRES) -> str:
return "boolean"

class BooleanType(DataType):

class SmallInt(DataType):
@staticmethod
def random_value(record_size: RecordSize) -> Any:
return random.choice((True, False))
def random_value(
rng: random.Random,
record_size: RecordSize = RecordSize.LARGE,
in_query: bool = False,
) -> Any:
if record_size == RecordSize.TINY:
min, max = -127, 128
elif record_size in (RecordSize.SMALL, RecordSize.MEDIUM, RecordSize.LARGE):
min, max = -32768, 32767
else:
raise ValueError(f"Unexpected record size {record_size}")

if rng.randrange(10) == 0:
return min
if rng.randrange(10) == 0:
return max
return rng.randint(min, max)

@staticmethod
def name(backend: Backend) -> str:
return "boolean"
def numeric_value(num: int, in_query: bool = False) -> Any:
return num

@staticmethod
def name(backend: Backend = Backend.POSTGRES) -> str:
if backend == Backend.AVRO:
return "int" # no explicit support in AVRO
elif backend == Backend.JSON:
return "integer" # no explicit support in JSON
else:
return "smallint"


class IntType(DataType):
class Int(DataType):
@staticmethod
def random_value(record_size: RecordSize) -> Any:
def random_value(
rng: random.Random,
record_size: RecordSize = RecordSize.LARGE,
in_query: bool = False,
) -> Any:
if record_size == RecordSize.TINY:
return random.randint(-127, 128)
min, max = -127, 128
elif record_size == RecordSize.SMALL:
return random.randint(-32768, 32767)
min, max = -32768, 32767
elif record_size in (RecordSize.MEDIUM, RecordSize.LARGE):
return random.randint(-2147483648, 2147483647)
min, max = -2147483648, 2147483647
else:
raise ValueError(f"Unexpected record size {record_size}")

if rng.randrange(10) == 0:
return min
if rng.randrange(10) == 0:
return max
return rng.randint(min, max)

@staticmethod
def numeric_value(num: int) -> Any:
def numeric_value(num: int, in_query: bool = False) -> Any:
return num

@staticmethod
def name(backend: Backend) -> str:
return "int"
def name(backend: Backend = Backend.POSTGRES) -> str:
if backend == Backend.JSON:
return "integer"
else:
return "int"


class LongType(DataType):
class Long(DataType):
@staticmethod
def random_value(record_size: RecordSize) -> Any:
def random_value(
rng: random.Random,
record_size: RecordSize = RecordSize.LARGE,
in_query: bool = False,
) -> Any:
if record_size == RecordSize.TINY:
return random.randint(-127, 128)
min, max = -127, 128
elif record_size == RecordSize.SMALL:
return random.randint(-32768, 32767)
min, max = -32768, 32767
elif record_size == RecordSize.MEDIUM:
return random.randint(-2147483648, 2147483647)
min, max = -2147483648, 2147483647
elif record_size == RecordSize.LARGE:
return random.randint(-9223372036854775808, 9223372036854775807)
min, max = -9223372036854775808, 9223372036854775807
else:
raise ValueError(f"Unexpected record size {record_size}")

if rng.randrange(10) == 0:
return min
if rng.randrange(10) == 0:
return max
return rng.randint(min, max)

@staticmethod
def numeric_value(num: int) -> Any:
def numeric_value(num: int, in_query: bool = False) -> Any:
return num

@staticmethod
def name(backend: Backend) -> str:
def name(backend: Backend = Backend.POSTGRES) -> str:
if backend == Backend.AVRO:
return "long"
elif backend == Backend.JSON:
return "integer"
else:
return "bigint"


class FloatType(DataType):
class Float(DataType):
@staticmethod
def random_value(record_size: RecordSize) -> Any:
def random_value(
rng: random.Random,
record_size: RecordSize = RecordSize.LARGE,
in_query: bool = False,
) -> Any:
if rng.randrange(10) == 0:
return 1.0
if rng.randrange(10) == 0:
return 0.0

if record_size == RecordSize.TINY:
return random.random()
return rng.random()
elif record_size == RecordSize.SMALL:
return random.uniform(-100, 100)
return rng.uniform(-100, 100)
elif record_size == RecordSize.MEDIUM:
return random.uniform(-1_000_000, 1_000_000)
return rng.uniform(-1_000_000, 1_000_000)
elif record_size == RecordSize.LARGE:
return random.uniform(-1_000_000_000, 1_000_000_000_00)
return rng.uniform(-1_000_000_000, 1_000_000_000_00)
else:
raise ValueError(f"Unexpected record size {record_size}")

@staticmethod
def numeric_value(num: int) -> Any:
def numeric_value(num: int, in_query: bool = False) -> Any:
return num

@staticmethod
def name(backend: Backend) -> str:
def name(backend: Backend = Backend.POSTGRES) -> str:
if backend == Backend.AVRO:
return "float"
elif backend == Backend.JSON:
return "number"
else:
return "float4"


class DoubleType(FloatType):
class Double(Float):
@staticmethod
def name(backend: Backend) -> str:
def name(backend: Backend = Backend.POSTGRES) -> str:
if backend == Backend.AVRO:
return "double"
elif backend == Backend.JSON:
return "number"
else:
return "float8"


class StringType(DataType):
class Text(DataType):
@staticmethod
def random_value(record_size: RecordSize) -> Any:
def random_value(
rng: random.Random,
record_size: RecordSize = RecordSize.LARGE,
in_query: bool = False,
) -> Any:
if rng.randrange(10) == 0:
result = rng.choice(
[
# "NULL", # TODO: Reenable after #21937 is fixed
"0.0",
"True",
# "",
"表ポあA鷗ŒéB逍Üߪąñ丂㐀𠀀",
rng.randint(-100, 100),
]
)
# Fails: unterminated dollar-quoted string
# chars = string.printable
chars = string.ascii_letters + string.digits
if record_size == RecordSize.TINY:
return random.choice(("foo", "bar", "baz"))
result = random.choice(("foo", "bar", "baz"))
elif record_size == RecordSize.SMALL:
return "".join(random.choice(string.printable) for _ in range(3))
result = "".join(random.choice(chars) for _ in range(3))
elif record_size == RecordSize.MEDIUM:
return "".join(random.choice(string.printable) for _ in range(10))
result = "".join(random.choice(chars) for _ in range(10))
elif record_size == RecordSize.LARGE:
return "".join(random.choice(string.printable) for _ in range(100))
result = "".join(random.choice(chars) for _ in range(100))
else:
raise ValueError(f"Unexpected record size {record_size}")

return literal(str(result)) if in_query else str(result)

@staticmethod
def numeric_value(num: int) -> Any:
return f"key{num}"
def numeric_value(num: int, in_query: bool = False) -> Any:
result = f"key{num}"
return f"'{result}'" if in_query else str(result)

@staticmethod
def name(backend: Backend) -> str:
def name(backend: Backend = Backend.POSTGRES) -> str:
if backend == Backend.POSTGRES:
return "text"
else:
return "string"


class Bytea(Text):
@staticmethod
def name(backend: Backend = Backend.POSTGRES) -> str:
if backend == Backend.AVRO:
return "bytes"
elif backend == Backend.JSON:
return "string"
else:
return "text"
return "bytea"


class Jsonb(DataType):
@staticmethod
def name(backend: Backend = Backend.POSTGRES) -> str:
if backend == Backend.AVRO:
return "record"
elif backend == Backend.JSON:
return "object"
else:
return "jsonb"

@staticmethod
def random_value(
rng: random.Random,
record_size: RecordSize = RecordSize.LARGE,
in_query: bool = False,
) -> Any:
if record_size == RecordSize.TINY:
key_range = 1
elif record_size == RecordSize.SMALL:
key_range = 5
elif record_size == RecordSize.MEDIUM:
key_range = 10
elif record_size == RecordSize.LARGE:
key_range = 20
else:
raise ValueError(f"Unexpected record size {record_size}")
result = {f"key{key}": str(rng.randint(-100, 100)) for key in range(key_range)}
return f"'{json.dumps(result)}'::jsonb" if in_query else json.dumps(result)

@staticmethod
def numeric_value(num: int, in_query: bool = False) -> Any:
result = {f"key{num}": str(num)}
return literal(json.dumps(result)) if in_query else json.dumps(result)


class TextTextMap(Jsonb):
@staticmethod
def name(backend: Backend = Backend.POSTGRES) -> str:
if backend == Backend.AVRO:
return "record"
elif backend == Backend.JSON:
return "object"
else:
return "map[text=>text]"


DATA_TYPES = DataType.__subclasses__()

# fastavro._schema_common.UnknownType: record
DATA_TYPES_FOR_AVRO = list(set(DATA_TYPES) - {TextTextMap, Jsonb, Boolean})
Loading

0 comments on commit 00b2e1a

Please sign in to comment.