import re
from enum import Enum
from pathlib import Path
from typing import Any, Dict, List
import yaml
from pydantic.v1 import BaseModel, Field, root_validator
from redis.commands.search.field import Field as RedisField
from redisvl.schema.fields import BaseField, FieldFactory
from redisvl.utils.log import get_logger
from redisvl.utils.utils import model_to_dict
logger = get_logger(__name__)
SCHEMA_VERSION = "0.1.0"
class StorageType(Enum):
"""
Enumeration for the storage types supported in Redis.
Attributes:
HASH (str): Represents the 'hash' storage type in Redis.
JSON (str): Represents the 'json' storage type in Redis.
"""
HASH = "hash"
JSON = "json"
class IndexInfo(BaseModel):
"""Index info includes the essential details regarding index settings,
such as its name, prefix, key separator, and storage type in Redis.
In yaml format, the index info section looks like:
.. code-block:: yaml
index:
name: user-index
prefix: user
key_separtor: ':'
storage_type: json
In dict format, the index info section looks like:
.. code-block:: python
{"index": {
"name": "user-index",
"prefix": "user",
"key_separator": ":",
"storage_type": "json"
}}
"""
name: str
"""The unique name of the index."""
prefix: str = "rvl"
"""The prefix used for Redis keys associated with this index."""
key_separator: str = ":"
"""The separator character used in designing Redis keys."""
storage_type: StorageType = StorageType.HASH
"""The storage type used in Redis (e.g., 'hash' or 'json')."""
[docs]
class IndexSchema(BaseModel):
"""A schema definition for a search index in Redis, used in RedisVL for
configuring index settings and organizing vector and metadata fields.
The class offers methods to create an index schema from a YAML file or a
Python dictionary, supporting flexible schema definitions and easy
integration into various workflows.
An example `schema.yaml` file might look like this:
.. code-block:: yaml
version: '0.1.0'
index:
name: user-index
prefix: user
key_separator: ":"
storage_type: json
fields:
- name: user
type: tag
- name: credit_score
type: tag
- name: embedding
type: vector
attrs:
algorithm: flat
dims: 3
distance_metric: cosine
datatype: float32
Loading the schema for RedisVL from yaml is as simple as:
.. code-block:: python
from redisvl.schema import IndexSchema
schema = IndexSchema.from_yaml("schema.yaml")
Loading the schema for RedisVL from dict is as simple as:
.. code-block:: python
from redisvl.schema import IndexSchema
schema = IndexSchema.from_dict({
"index": {
"name": "user-index",
"prefix": "user",
"key_separator": ":",
"storage_type": "json",
},
"fields": [
{"name": "user", "type": "tag"},
{"name": "credit_score", "type": "tag"},
{
"name": "embedding",
"type": "vector",
"attrs": {
"algorithm": "flat",
"dims": 3,
"distance_metric": "cosine",
"datatype": "float32"
}
}
]
})
Note:
The `fields` attribute in the schema must contain unique field names to ensure
correct and unambiguous field references.
"""
index: IndexInfo
"""Details of the basic index configurations."""
fields: Dict[str, BaseField] = {}
"""Fields associated with the search index and their properties"""
version: str = Field(default=SCHEMA_VERSION, const=True)
"""Version of the underlying index schema."""
@staticmethod
def _make_field(storage_type, **field_inputs) -> BaseField:
"""
Parse raw field inputs derived from YAML or dict.
Validates and sets the 'path' attribute for fields when using JSON storage type.
"""
# Create field from inputs
field = FieldFactory.create_field(**field_inputs)
# Handle field path and storage type
if storage_type == StorageType.JSON:
field.path = field.path if field.path else f"$.{field.name}"
else:
if field.path is not None:
logger.warning(
f"Path attribute for field '{field.name}' will be ignored for HASH storage type."
)
field.path = None
return field
@root_validator(pre=True)
@classmethod
def validate_and_create_fields(cls, values):
"""
Validate uniqueness of field names and create valid field instances.
"""
# Ensure index is a dictionary for validation
index = values.get("index")
if not isinstance(index, IndexInfo):
index = IndexInfo(**index)
input_fields = values.get("fields", [])
prepared_fields: Dict[str, BaseField] = {}
# Handle old fields format temporarily
if isinstance(input_fields, dict):
raise ValueError("New schema format introduced; please update schema spec.")
# Process and create fields
for field_input in input_fields:
field = cls._make_field(index.storage_type, **field_input)
if field.name in prepared_fields:
raise ValueError(
f"Duplicate field name: {field.name}. Field names must be unique across all fields."
)
prepared_fields[field.name] = field
values["fields"] = prepared_fields
values["index"] = index
return values
[docs]
@classmethod
def from_yaml(cls, file_path: str) -> "IndexSchema":
"""Create an IndexSchema from a YAML file.
Args:
file_path (str): The path to the YAML file.
Returns:
IndexSchema: The index schema.
.. code-block:: python
from redisvl.schema import IndexSchema
schema = IndexSchema.from_yaml("schema.yaml")
"""
try:
fp = Path(file_path).resolve()
except OSError as e:
raise ValueError(f"Invalid file path: {file_path}") from e
if not fp.exists():
raise FileNotFoundError(f"Schema file {file_path} does not exist")
with open(fp, "r") as f:
yaml_data = yaml.safe_load(f)
return cls(**yaml_data)
[docs]
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "IndexSchema":
"""Create an IndexSchema from a dictionary.
Args:
data (Dict[str, Any]): The index schema data.
Returns:
IndexSchema: The index schema.
.. code-block:: python
from redisvl.schema import IndexSchema
schema = IndexSchema.from_dict({
"index": {
"name": "docs-index",
"prefix": "docs",
"storage_type": "hash",
},
"fields": [
{
"name": "doc-id",
"type": "tag"
},
{
"name": "doc-embedding",
"type": "vector",
"attrs": {
"algorithm": "flat",
"dims": 1536
}
}
]
})
"""
return cls(**data)
@property
def field_names(self) -> List[str]:
"""A list of field names associated with the index schema.
Returns:
List[str]: A list of field names from the schema.
"""
return list(self.fields.keys())
@property
def redis_fields(self) -> List[RedisField]:
"""A list of core redis-py field definitions based on the
current schema fields.
Converts RedisVL field definitions into a format suitable for use with
redis-py, facilitating the creation and management of index structures in
the Redis database.
Returns:
List[RedisField]: A list of redis-py field definitions.
"""
redis_fields: List[RedisField] = [
field.as_redis_field() for _, field in self.fields.items()
]
return redis_fields
[docs]
def add_field(self, field_inputs: Dict[str, Any]):
"""Adds a single field to the index schema based on the specified field
type and attributes.
This method allows for the addition of individual fields to the schema,
providing flexibility in defining the structure of the index.
Args:
field_inputs (Dict[str, Any]): A field to add.
Raises:
ValueError: If the field name or type are not provided or if the name
already exists within the schema.
.. code-block:: python
# Add a tag field
schema.add_field({"name": "user", "type": "tag})
# Add a vector field
schema.add_field({
"name": "user-embedding",
"type": "vector",
"attrs": {
"dims": 1024,
"algorithm": "flat",
"datatype": "float32"
}
})
"""
# Parse field inputs
field = self._make_field(self.index.storage_type, **field_inputs)
# Check for duplicates
if field.name in self.fields:
raise ValueError(
f"Duplicate field name: {field.name}. Field names must be unique across all fields for this index."
)
# Add field
self.fields[field.name] = field
[docs]
def add_fields(self, fields: List[Dict[str, Any]]):
"""Extends the schema with additional fields.
This method allows dynamically adding new fields to the index schema. It
processes a list of field definitions.
Args:
fields (List[Dict[str, Any]]): A list of fields to add.
Raises:
ValueError: If a field with the same name already exists in the
schema.
.. code-block:: python
schema.add_fields([
{"name": "user", "type": "tag"},
{"name": "bio", "type": "text"},
{
"name": "user-embedding",
"type": "vector",
"attrs": {
"dims": 1024,
"algorithm": "flat",
"datatype": "float32"
}
}
])
"""
for field in fields:
self.add_field(field)
[docs]
def remove_field(self, field_name: str):
"""Removes a field from the schema based on the specified name.
This method is useful for dynamically altering the schema by removing
existing fields.
Args:
field_name (str): The name of the field to be removed.
"""
if field_name not in self.fields:
logger.warning(f"Field '{field_name}' does not exist in the schema")
return
del self.fields[field_name]
def generate_fields(
self,
data: Dict[str, Any],
strict: bool = False,
ignore_fields: List[str] = [],
) -> List[Dict[str, Any]]:
"""Generates a list of extracted field specs from a sample data point.
This method simplifies the process of creating a schema by inferring
field types and attributes from sample data. It's particularly useful
during the development process while dealing with datasets containing
numerous fields, reducing the need for manual specification.
Args:
data (Dict[str, Any]): Sample data used to infer field definitions.
strict (bool, optional): If True, raises an error on failing to
infer a field type. Defaults to False.
ignore_fields (List[str], optional): A list of field names to
exclude from processing. Defaults to an empty list.
Returns:
Dict[str, List[Dict[str, Any]]]: A dictionary with inferred field
types and attributes.
Notes:
- Vector fields are not generated by this method.
- This method employs heuristics and may not always correctly infer
field types.
"""
fields: List[Dict[str, Any]] = []
for field_name, value in data.items():
if field_name in ignore_fields:
continue
try:
field_type = TypeInferrer.infer(value)
fields.append(
FieldFactory.create_field(
field_type,
field_name,
).dict()
)
except ValueError as e:
if strict:
raise
else:
logger.warn(
message=f"Error inferring field type for {field_name}: {e}"
)
return fields
[docs]
def to_dict(self) -> Dict[str, Any]:
"""Serialize the index schema model to a dictionary, handling Enums
and other special cases properly.
Returns:
Dict[str, Any]: The index schema as a dictionary.
"""
dict_schema = model_to_dict(self)
# cast fields back to a pure list
dict_schema["fields"] = [
field for field_name, field in dict_schema["fields"].items()
]
return dict_schema
[docs]
def to_yaml(self, file_path: str, overwrite: bool = True) -> None:
"""Write the index schema to a YAML file.
Args:
file_path (str): The path to the YAML file.
overwrite (bool): Whether to overwrite the file if it already exists.
Raises:
FileExistsError: If the file already exists and overwrite is False.
"""
fp = Path(file_path).resolve()
if fp.exists() and not overwrite:
raise FileExistsError(f"Schema file {file_path} already exists.")
with open(fp, "w") as f:
yaml_data = self.to_dict()
yaml.dump(yaml_data, f, sort_keys=False)
class TypeInferrer:
"""Infers the type of a field based on its value."""
GEO_PATTERN = re.compile(
r"^\s*[-+]?([1-8]?\d(\.\d+)?|90(\.0+)?),\s*[-+]?(180(\.0+)?|((1[0-7]\d)|([1-9]?\d))(\.\d+)?)\s*$"
)
TYPE_METHOD_MAP = {
"numeric": "_is_numeric",
"geo": "_is_geographic",
"tag": "_is_tag",
"text": "_is_text",
}
@classmethod
def infer(cls, value: Any) -> str:
"""Infers the field type for a given value.
Args:
value: The value to infer the type of.
Returns:
The inferred field type as a string.
Raises:
ValueError: If the type cannot be inferred.
"""
for type_name, method_name in cls.TYPE_METHOD_MAP.items():
if getattr(cls, method_name)(value):
return type_name
raise ValueError(f"Unable to infer type for value: {value}")
@classmethod
def _is_numeric(cls, value: Any) -> bool:
"""Check if the value is numeric."""
if not isinstance(value, (int, float, str)):
return False
try:
float(value)
return True
except (ValueError, TypeError):
return False
@classmethod
def _is_tag(cls, value: Any) -> bool:
"""Check if the value is a tag."""
return isinstance(value, (list, set, tuple)) and all(
isinstance(v, str) for v in value
)
@classmethod
def _is_text(cls, value: Any) -> bool:
"""Check if the value is text."""
return isinstance(value, str)
@classmethod
def _is_geographic(cls, value: Any) -> bool:
"""Check if the value is a geographic coordinate."""
return isinstance(value, str) and cls.GEO_PATTERN.match(value) is not None