Skip to content

Releases: pinecone-io/spark-pinecone

v1.2.0 Release

17 Dec 18:16
ac9c084
Compare
Choose a tag to compare

Added: Support for Stream Upserts with Structured Streaming

We’re excited to introduce support for upserts using Structured Streaming in the Pinecone Spark Connector! This enhancement allows users to seamlessly stream data into Pinecone for upsert operations.

Structured Streaming is a scalable and fault-tolerant streaming processing engine built on Spark. With this capability, users can now handle continuous data streams with minimal latency, enabling real-time updates to Pinecone indexes.

Key Benefits:

  • Real-Time Data Ingestion: Effortlessly stream data from various sources, such as Kafka, file systems, or cloud object storage, directly into Pinecone for upserts.
  • Support for Iceberg Tables: Read from Apache Iceberg tables as part of your structured streaming pipelines, enabling easy integration with your data lakehouse.
  • Databricks Integration: Fully compatible with Databricks, allowing you to build and execute structured streaming pipelines seamlessly on the Databricks platform.
  • Checkpointing for Fault Tolerance: Use checkpointing to ensure stream reliability and prevent data loss during failures. Checkpoints store the progress and state of your stream, enabling seamless recovery without duplicates or missed data.
  • Scalability: Handle large-scale data streams while maintaining high throughput.
  • Ease of Use: Integrates seamlessly with Spark’s DataFrame and Dataset APIs.

Example:

Notes for Examples

  1. For databricks users, please download the assembly jar to avoid dependency conflict. The assembly jars can be found here: https://github.com/pinecone-io/spark-pinecone?tab=readme-ov-file#databricks-and-friends
  2. The input jsonl files used to read data from can be found here: https://github.com/pinecone-io/spark-pinecone/tree/main/src/it/resources
  3. Checkpointing: Ensure the checkpointLocation points to a durable and unique directory to enable fault tolerance and prevent duplicate processing.

The following example shows how to stream upsert records into Pinecone using PySpark:

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, ArrayType, FloatType, StringType, LongType
import os

# Your API key and index name
api_key = "PINECONE_API_KEY"
index_name = "PINECONE_INDEX_NAME"
source_tag = "PINECONE_SOURCE_TAG"

COMMON_SCHEMA = StructType([
    StructField("id", StringType(), False),
    StructField("namespace", StringType(), True),
    StructField("values", ArrayType(FloatType(), False), False),
    StructField("metadata", StringType(), True),
    StructField("sparse_values", StructType([
        StructField("indices", ArrayType(LongType(), False), False),
        StructField("values", ArrayType(FloatType(), False), False)
    ]), True)
])

# Initialize Spark session
spark = SparkSession.builder \
    .appName("StreamUpsertExample") \
    .config("spark.sql.shuffle.partitions", 3) \
    .master("local") \
    .getOrCreate()

# Read the stream of JSON files, applying the schema from the input directory
lines = spark.readStream \
    .option("multiLine", True) \
    .option("mode", "PERMISSIVE") \
    .schema(COMMON_SCHEMA) \
    .json("path/to/input/directory/")

# Write the stream to Pinecone using the defined options
upsert = lines.writeStream \
    .format("io.pinecone.spark.pinecone.Pinecone") \
    .option("pinecone.apiKey", api_key) \
    .option("pinecone.indexName", index_name) \
    .option("pinecone.sourceTag", source_tag) \
    .option("checkpointLocation", "path/to/checkpoint/dir") \
    .outputMode("append") \
    .start()

upsert.awaitTermination()

The following example shows how to stream upsert records into Pinecone using scala-spark:

import io.pinecone.spark.pinecone.{COMMON_SCHEMA, PineconeOptions}
import org.apache.spark.SparkConf
import org.apache.spark.sql.{SaveMode, SparkSession}

object MainApp extends App {
  // Your API key and index name
  val apiKey = "PINECONE_API_KEY"
  val indexName = "PINECONE_INDEX_NAME"

  // Create a Spark session
  val spark = SparkSession.builder()
    .appName("StreamUpsertExample")
    .config("spark.sql.shuffle.partitions", 3)
    .master("local")
    .getOrCreate()

  // Read the JSON files into a DataFrame, applying the COMMON_SCHEMA from input directory
  val lines = spark.readStream
    .option("multiLine", value = true)
    .option("mode", "PERMISSIVE")
    .schema(COMMON_SCHEMA)
    .json("path/to/input/directory/")

  // Define Pinecone options as a Map
  val pineconeOptions = Map(
    PineconeOptions.PINECONE_API_KEY_CONF -> System.getenv("PINECONE_API_KEY"),
    PineconeOptions.PINECONE_INDEX_NAME_CONF -> System.getenv("PINECONE_INDEX"),
    PineconeOptions.PINECONE_SOURCE_TAG_CONF -> System.getenv("PINECONE_SOURCE_TAG")
  )

  // Write the stream to Pinecone using the defined options
  val upsert = lines
    .writeStream
    .format("io.pinecone.spark.pinecone.Pinecone")
    .options(pineconeOptions)
    .option("checkpointLocation", "path/to/checkpoint/dir")
    .outputMode("append")
    .start()

  upsert.awaitTermination()
}

What's Changed

New Contributors

Full Changelog: v1.1.0...v1.2.0

v1.1.0 Release

09 Jul 14:56
d57f5c9
Compare
Choose a tag to compare

Added: Support for source tags

Partners can now add source tags to the spark connector.

Example:

The following example shows how to set source tag when upserting records into Pinecone using PySpark:

from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, ArrayType, FloatType, StringType, LongType

# Initialize Spark session with the spark-pinecone dependency
spark = SparkSession.builder.getOrCreate()

# Your API key, index name, and source tag
api_key = "PINECONE_API_KEY"
index_name = "PINECONE_INDEX_NAME"
source_tag = "PINECONE_SOURCE_TAG"

# Declare the schema
COMMON_SCHEMA = StructType([
    StructField("id", StringType(), False),
    StructField("namespace", StringType(), True),
    StructField("values", ArrayType(FloatType(), False), False),
    StructField("metadata", StringType(), True),
    StructField("sparse_values", StructType([
        StructField("indices", ArrayType(LongType(), False), False),
        StructField("values", ArrayType(FloatType(), False), False)
    ]), True)
])

# Read the file and apply the schema
df = spark.read \
    .option("multiLine", value = True) \
    .option("mode", "PERMISSIVE") \
    .schema(COMMON_SCHEMA) \
    .json("/FileStore/tables/sample-4.jsonl")

# Show if the read was successful
print("df count:", df.count(), "should be 7")
df.show()

# Write to Pinecone
df.write \
    .option("pinecone.apiKey", api_key) \
    .option("pinecone.indexName", index_name) \
    .option("pinecone.sourceTag", source_tag) \
    .format("io.pinecone.spark.pinecone.Pinecone") \
    .mode("append") \
    .save()

The following example shows how to set source tag when upserting records into Pinecone using scala-spark:

import io.pinecone.spark.pinecone.{COMMON_SCHEMA, PineconeOptions}
import org.apache.spark.SparkConf
import org.apache.spark.sql.{SaveMode, SparkSession}

object MainApp extends App {
  // Define the Pinecone API key, index name, and source tag
  val apiKey = "PINECONE_API_KEY"
  val indexName = "PINECONE_INDEX_NAME"
  val sourceTag = "PINECONE_SOURCE_TAG"

  // Configure Spark to run locally with all available cores
  val conf = new SparkConf().setMaster("local[*]")
  
  // Create a Spark session with the defined configuration
  val spark = SparkSession.builder().config(conf).getOrCreate()

  // Read the JSON file into a DataFrame, applying the COMMON_SCHEMA
  val df = spark.read
    .option("multiLine", value = true)
    .option("mode", "PERMISSIVE")
    .schema(COMMON_SCHEMA)
    .json("src/test/resources/sample.jsonl") // path to sample.jsonl

  // Define Pinecone options as a Map
  val pineconeOptions = Map(
    PineconeOptions.PINECONE_API_KEY_CONF -> apiKey,
    PineconeOptions.PINECONE_INDEX_NAME_CONF -> indexName,
    PineconeOptions.PINECONE_SOURCE_TAG_CONF -> sourceTag
  )

  // Show if the read was successful
  println(df.count() + "should be 7")
  df.show(df.count().toInt)

  // Write the DataFrame to Pinecone using the defined options
  df.write
    .options(pineconeOptions)
    .format("io.pinecone.spark.pinecone.Pinecone")
    .mode(SaveMode.Append)
    .save()
}

Updated: Metadata size to 40 kb

Previously, users could only upsert records with metadata size of 5 kb. With this release, users can upsert records with metadata of size 40 kb.

Updated: Pinecone java sdk client v1.0.0 to v1.2.2

Spark-connector relies on pinecone java sdk and as a part of this release, we have updated the java sdk client version from v1.0.0 to v1.1.0.

What's Changed

Full Changelog: v1.0.0...v1.1.0

v1.0.0 Release

15 May 23:30
Compare
Choose a tag to compare

Added: Support for serverless indexes

Previously, users could only upsert records into pod indexes. With this release, users now have the capability to upsert records into serverless indexes as well.

Updated: Datatype of sparse indices from signed 32-bit integers to unsigned 32-bit integers

The expected datatype of sparse indices in Pinecone's backend API is unsigned 32-bit integers while the spark connector used to accept signed 32-bit integers. To address the limitations, sparse indices will now accept Long (instead of int), with the input range of [0, 2^32 - 1]. Everything outside of this range will throw an IllegalArgumentException.

Removed: projectName and environment variables

Users are not required to input projectName and environment as input fields while upserting records. The endpoint resolution is handled by the underlying java sdk without the need of both variables. ApiKey and indexName are the only required parameters.

Updated: Pinecone java sdk client v0.7.4 to v1.0.0

Spark-connector relies on pinecone java sdk and as a part of this release, we have updated the java sdk client version from v0.7.4 to v1.0.0.

Example:

The following example shows how to upsert records into Pinecone using scala-spark:

import io.pinecone.spark.pinecone.{COMMON_SCHEMA, PineconeOptions}
import org.apache.spark.SparkConf
import org.apache.spark.sql.{SaveMode, SparkSession}

object MainApp extends App {
  val conf = new SparkConf()
    .setMaster("local[*]")

  val apiKey = "PINECONE_API_KEY"
  val indexName = "PINECONE_INDEX_NAME"

  val spark = SparkSession
    .builder()
    .config(conf)
    .getOrCreate()

  val df = spark.read
    .option("multiLine", value = true)
    .option("mode", "PERMISSIVE")
    .schema(COMMON_SCHEMA)
    .json("path_to_sample.jsonl") 
    .repartition(2)

  val pineconeOptions = Map(
    PineconeOptions.PINECONE_API_KEY_CONF -> apiKey,
    PineconeOptions.PINECONE_INDEX_NAME_CONF -> indexName
  )

  df.write
    .options(pineconeOptions)
    .format("io.pinecone.spark.pinecone.Pinecone")
    .mode(SaveMode.Append)
    .save()
}

The following example shows how to upsert records into Pinecone using python-spark:

from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, ArrayType, FloatType, StringType, LongType

# Your API key, environment, project name, and index name
api_key = "PINECONE_API_KEY"
index_name = "PINECONE_INDEX_NAME"

COMMON_SCHEMA = StructType([
    StructField("id", StringType(), False),
    StructField("namespace", StringType(), True),
    StructField("values", ArrayType(FloatType(), False), False),
    StructField("metadata", StringType(), True),
    StructField("sparse_values", StructType([
        StructField("indices", ArrayType(LongType(), False), False),
        StructField("values", ArrayType(FloatType(), False), False)
    ]), True)
])

# Initialize Spark
spark = SparkSession.builder.getOrCreate()

# Read the file and apply the schema
df = spark.read \
    .option("multiLine", value = True) \
    .option("mode", "PERMISSIVE") \
    .schema(COMMON_SCHEMA) \
    .json("/FileStore/tables/sample-4.jsonl")

# Show if the read was successful
df.show()

df.write \
    .option("pinecone.apiKey", api_key) \
    .option("pinecone.indexName", index_name) \
    .format("io.pinecone.spark.pinecone.Pinecone") \
    .mode("append") \
    .save()

Sample.jsonl file used an input in the above scala-spark and python-spark examples is shown below.

[
  {
    "id": "v1",
    "namespace": "default",
    "values": [
      1,
      2,
      3
    ],
    "metadata": {
      "hello": [
        "world",
        "you"
      ],
      "numbers": "or not",
      "actual_number": 5.2,
      "round": 3
    },
    "sparse_values": {
      "indices": [
        0,
        2
      ],
      "values": [ 
        5.5,
        5
      ]
    }
  },
  {
    "id": "v2",
    "values": [
      3,
      2,
      1
    ]
  },
  {
    "id": "v3",
    "values": [
      1,
      4,
      9
    ],
    "namespace": "default"
  }
]

Databricks users:

Please import the spark-pinecone assembly jar from S3: s3://pinecone-jars/1.0.0/spark-pinecone-uberjar.jar.

What's Changed

  • Add s3 v0.2.2 assembly jar url by @rohanshah18 in #25
  • Update java sdk to v1 to add support for serverless indexes and accept sparse indices within the range of unsigned 32-bit integers by @rohanshah18 in #31
  • Update README for v1 by @rohanshah18 in #32

Full Changelog: v0.2.2...v1.0.0

v0.2.2 Release

25 Jan 17:03
02ac8e9
Compare
Choose a tag to compare

Updated: Pinecone java sdk client v0.6.0 to v0.7.4

Spark-connector relies on pinecone java sdk and as a part of this release, we have updated the java sdk client version from v0.6.0 to v0.7.4. Pinecone java sdk client v0.6.0 had index creation issues which are now resolved in v0.7.4. Even though Spark-connector doesn't offer index creation, users can import pinecone java sdk client in the sbt or gradle project along side the spark connector for additional control and data plane operations.

Databricks users:

Please import the spark-pinecone assembly jar from S3: s3://pinecone-jars/0.2.2/spark-pinecone-uberjar.jar.

What's Changed

Full Changelog: v0.2.1...v0.2.2

v0.2.1 Release

25 Oct 18:50
6150c65
Compare
Choose a tag to compare

Added: Upsert Sparse Vectors

We have added support to insert or update the sparse vectors in the spark-pinecone connector. The basic vector type in Pinecone is a dense vector. Pinecone also supports vectors with sparse and dense values together, which allows users to perform hybrid search on their Pinecone index. Hybrid search combines semantic and keyword search in one query for more relevant results.

Example:

The following example shows how to upsert sparse-dense vectors into Pinecone.

import io.pinecone.spark.pinecone.{COMMON_SCHEMA, PineconeOptions}
import org.apache.spark.SparkConf
import org.apache.spark.sql.{SaveMode, SparkSession}

object MainApp extends App {
  val conf = new SparkConf()
    .setMaster("local[*]")

  val spark = SparkSession
    .builder()
    .config(conf)
    .getOrCreate()

  val df = spark.read
    .option("multiLine", value = true)
    .option("mode", "PERMISSIVE")
    .schema(COMMON_SCHEMA)
    .json("path_to_sample.jsonl") 
    .repartition(2)

  val pineconeOptions = Map(
    PineconeOptions.PINECONE_API_KEY_CONF -> apiKey,
    PineconeOptions.PINECONE_ENVIRONMENT_CONF -> environment,
    PineconeOptions.PINECONE_PROJECT_NAME_CONF -> projectName,
    PineconeOptions.PINECONE_INDEX_NAME_CONF -> indexName
  )

  df.write
    .options(pineconeOptions)
    .format("io.pinecone.spark.pinecone.Pinecone")
    .mode(SaveMode.Append)
    .save()
}

Sample.jsonl file used an input in the above scala code is shown in the next example.

Added: Optional Fields for Input

We've introduced the option to make the namespace, metadata, and sparse_values fields in the input JSON optional with this release. Users can now choose not to include these fields, and the only mandatory fields are id and values. Please note that if you include the sparse_values field in the input, both indices and values within sparse_values must be present. The following example of Sample.json file illustrates possible combinations of input vectors supported by the schema.

[
  {
    "id": "v1",
    "namespace": "default",
    "values": [
      1,
      2,
      3
    ],
    "metadata": {
      "hello": [
        "world",
        "you"
      ],
      "numbers": "or not",
      "actual_number": 5.2,
      "round": 3
    },
    "sparse_values": {
      "indices": [
        0,
        2
      ],
      "values": [ 
        5.5,
        5
      ]
    }
  },
  {
    "id": "v2",
    "values": [
      3,
      2,
      1
    ]
  },
  {
    "id": "v3",
    "values": [
      1,
      4,
      9
    ],
    "namespace": "default"
  }
]

Databricks users:

Please import the spark-pinecone assembly jar from S3: s3://pinecone-jars/0.2.1/spark-pinecone-uberjar.jar.

What's Changed

Full Changelog: v0.1.4...v0.2.1

v0.1.4 Release

18 Oct 14:57
169aef1
Compare
Choose a tag to compare

Added: publish assembly jar to maven central

Temporarily, we have added the support to publish assembly jar to maven central but the ideal way to obtain the assembly jar on databricks platform for Scala 2.12 is from s3 bucket: s3://pinecone-jars/spark-pinecone-uberjar.jar

What's Changed

Full Changelog: v0.1.3...v0.1.4

v0.1.3 Release

17 Oct 21:47
Compare
Choose a tag to compare

Fixed: Dependency Cleanup and Compatibility Enhancement

Resolved dependency conflicts by updating the spark dependency to version 3.5.0 and the java sdk dependency to version 0.6.0, which now internally relies on gRPC v1.58.0 and netty 2.0.61.Final.

These changes effectively resolve the dependency version conflict faced on sbt projects as well as databricks notebook. Users utilizing the databricks platform are advised to utilize the assembly jar from the S3 location: <s3_link>.

Added: Apache 2.0 license

The pinecone-spark connector was open-source project and we have now added Apache 2.0 license, hence formally updating it's status to open-source.

What's Changed

  • Apache 2.0 License by @gdj0nes in #8
  • Update spark and java sdk versions to resolve dependency conflicts by @rohanshah18 in #10

Full Changelog: v0.1.1...v0.1.2

v0.1.1

13 Jan 18:08
Compare
Choose a tag to compare
Releasing 0.1.1

v0.1.0 - Initial release

16 Mar 20:38
Compare
Choose a tag to compare

Initial release of this project, see the README for more details.