Releases: pinecone-io/spark-pinecone
v1.2.0 Release
Added: Support for Stream Upserts with Structured Streaming
We’re excited to introduce support for upserts using Structured Streaming in the Pinecone Spark Connector! This enhancement allows users to seamlessly stream data into Pinecone for upsert operations.
Structured Streaming is a scalable and fault-tolerant streaming processing engine built on Spark. With this capability, users can now handle continuous data streams with minimal latency, enabling real-time updates to Pinecone indexes.
Key Benefits:
- Real-Time Data Ingestion: Effortlessly stream data from various sources, such as Kafka, file systems, or cloud object storage, directly into Pinecone for upserts.
- Support for Iceberg Tables: Read from Apache Iceberg tables as part of your structured streaming pipelines, enabling easy integration with your data lakehouse.
- Databricks Integration: Fully compatible with Databricks, allowing you to build and execute structured streaming pipelines seamlessly on the Databricks platform.
- Checkpointing for Fault Tolerance: Use checkpointing to ensure stream reliability and prevent data loss during failures. Checkpoints store the progress and state of your stream, enabling seamless recovery without duplicates or missed data.
- Scalability: Handle large-scale data streams while maintaining high throughput.
- Ease of Use: Integrates seamlessly with Spark’s DataFrame and Dataset APIs.
Example:
Notes for Examples
- For databricks users, please download the assembly jar to avoid dependency conflict. The assembly jars can be found here: https://github.com/pinecone-io/spark-pinecone?tab=readme-ov-file#databricks-and-friends
- The input jsonl files used to read data from can be found here: https://github.com/pinecone-io/spark-pinecone/tree/main/src/it/resources
- Checkpointing: Ensure the checkpointLocation points to a durable and unique directory to enable fault tolerance and prevent duplicate processing.
The following example shows how to stream upsert records into Pinecone using PySpark:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, ArrayType, FloatType, StringType, LongType
import os
# Your API key and index name
api_key = "PINECONE_API_KEY"
index_name = "PINECONE_INDEX_NAME"
source_tag = "PINECONE_SOURCE_TAG"
COMMON_SCHEMA = StructType([
StructField("id", StringType(), False),
StructField("namespace", StringType(), True),
StructField("values", ArrayType(FloatType(), False), False),
StructField("metadata", StringType(), True),
StructField("sparse_values", StructType([
StructField("indices", ArrayType(LongType(), False), False),
StructField("values", ArrayType(FloatType(), False), False)
]), True)
])
# Initialize Spark session
spark = SparkSession.builder \
.appName("StreamUpsertExample") \
.config("spark.sql.shuffle.partitions", 3) \
.master("local") \
.getOrCreate()
# Read the stream of JSON files, applying the schema from the input directory
lines = spark.readStream \
.option("multiLine", True) \
.option("mode", "PERMISSIVE") \
.schema(COMMON_SCHEMA) \
.json("path/to/input/directory/")
# Write the stream to Pinecone using the defined options
upsert = lines.writeStream \
.format("io.pinecone.spark.pinecone.Pinecone") \
.option("pinecone.apiKey", api_key) \
.option("pinecone.indexName", index_name) \
.option("pinecone.sourceTag", source_tag) \
.option("checkpointLocation", "path/to/checkpoint/dir") \
.outputMode("append") \
.start()
upsert.awaitTermination()
The following example shows how to stream upsert records into Pinecone using scala-spark:
import io.pinecone.spark.pinecone.{COMMON_SCHEMA, PineconeOptions}
import org.apache.spark.SparkConf
import org.apache.spark.sql.{SaveMode, SparkSession}
object MainApp extends App {
// Your API key and index name
val apiKey = "PINECONE_API_KEY"
val indexName = "PINECONE_INDEX_NAME"
// Create a Spark session
val spark = SparkSession.builder()
.appName("StreamUpsertExample")
.config("spark.sql.shuffle.partitions", 3)
.master("local")
.getOrCreate()
// Read the JSON files into a DataFrame, applying the COMMON_SCHEMA from input directory
val lines = spark.readStream
.option("multiLine", value = true)
.option("mode", "PERMISSIVE")
.schema(COMMON_SCHEMA)
.json("path/to/input/directory/")
// Define Pinecone options as a Map
val pineconeOptions = Map(
PineconeOptions.PINECONE_API_KEY_CONF -> System.getenv("PINECONE_API_KEY"),
PineconeOptions.PINECONE_INDEX_NAME_CONF -> System.getenv("PINECONE_INDEX"),
PineconeOptions.PINECONE_SOURCE_TAG_CONF -> System.getenv("PINECONE_SOURCE_TAG")
)
// Write the stream to Pinecone using the defined options
val upsert = lines
.writeStream
.format("io.pinecone.spark.pinecone.Pinecone")
.options(pineconeOptions)
.option("checkpointLocation", "path/to/checkpoint/dir")
.outputMode("append")
.start()
upsert.awaitTermination()
}
What's Changed
- Update README.md by @rohanshah18 in #37
- Add issue templates and auto-labeling by @anawishnoff in #38
- Add path to the sample.jsonl file in README by @rohanshah18 in #39
- Add support for stream upserts by @rohanshah18 in #42
New Contributors
- @anawishnoff made their first contribution in #38
Full Changelog: v1.1.0...v1.2.0
v1.1.0 Release
Added: Support for source tags
Partners can now add source tags to the spark connector.
Example:
The following example shows how to set source tag when upserting records into Pinecone using PySpark:
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, ArrayType, FloatType, StringType, LongType
# Initialize Spark session with the spark-pinecone dependency
spark = SparkSession.builder.getOrCreate()
# Your API key, index name, and source tag
api_key = "PINECONE_API_KEY"
index_name = "PINECONE_INDEX_NAME"
source_tag = "PINECONE_SOURCE_TAG"
# Declare the schema
COMMON_SCHEMA = StructType([
StructField("id", StringType(), False),
StructField("namespace", StringType(), True),
StructField("values", ArrayType(FloatType(), False), False),
StructField("metadata", StringType(), True),
StructField("sparse_values", StructType([
StructField("indices", ArrayType(LongType(), False), False),
StructField("values", ArrayType(FloatType(), False), False)
]), True)
])
# Read the file and apply the schema
df = spark.read \
.option("multiLine", value = True) \
.option("mode", "PERMISSIVE") \
.schema(COMMON_SCHEMA) \
.json("/FileStore/tables/sample-4.jsonl")
# Show if the read was successful
print("df count:", df.count(), "should be 7")
df.show()
# Write to Pinecone
df.write \
.option("pinecone.apiKey", api_key) \
.option("pinecone.indexName", index_name) \
.option("pinecone.sourceTag", source_tag) \
.format("io.pinecone.spark.pinecone.Pinecone") \
.mode("append") \
.save()
The following example shows how to set source tag when upserting records into Pinecone using scala-spark:
import io.pinecone.spark.pinecone.{COMMON_SCHEMA, PineconeOptions}
import org.apache.spark.SparkConf
import org.apache.spark.sql.{SaveMode, SparkSession}
object MainApp extends App {
// Define the Pinecone API key, index name, and source tag
val apiKey = "PINECONE_API_KEY"
val indexName = "PINECONE_INDEX_NAME"
val sourceTag = "PINECONE_SOURCE_TAG"
// Configure Spark to run locally with all available cores
val conf = new SparkConf().setMaster("local[*]")
// Create a Spark session with the defined configuration
val spark = SparkSession.builder().config(conf).getOrCreate()
// Read the JSON file into a DataFrame, applying the COMMON_SCHEMA
val df = spark.read
.option("multiLine", value = true)
.option("mode", "PERMISSIVE")
.schema(COMMON_SCHEMA)
.json("src/test/resources/sample.jsonl") // path to sample.jsonl
// Define Pinecone options as a Map
val pineconeOptions = Map(
PineconeOptions.PINECONE_API_KEY_CONF -> apiKey,
PineconeOptions.PINECONE_INDEX_NAME_CONF -> indexName,
PineconeOptions.PINECONE_SOURCE_TAG_CONF -> sourceTag
)
// Show if the read was successful
println(df.count() + "should be 7")
df.show(df.count().toInt)
// Write the DataFrame to Pinecone using the defined options
df.write
.options(pineconeOptions)
.format("io.pinecone.spark.pinecone.Pinecone")
.mode(SaveMode.Append)
.save()
}
Updated: Metadata size to 40 kb
Previously, users could only upsert records with metadata size of 5 kb. With this release, users can upsert records with metadata of size 40 kb.
Updated: Pinecone java sdk client v1.0.0 to v1.2.2
Spark-connector relies on pinecone java sdk and as a part of this release, we have updated the java sdk client version from v1.0.0
to v1.1.0
.
What's Changed
- Remove extra variables from scala-spark example and remove runTest step from release process by @rohanshah18 in #33
- Update max metadata size to 40 KB by @rohanshah18 in #34
- Update java sdk to v1.2.2. and add source tag by @rohanshah18 in #35
- Release v1.1.0 by @rohanshah18 in #36
Full Changelog: v1.0.0...v1.1.0
v1.0.0 Release
Added: Support for serverless indexes
Previously, users could only upsert records into pod indexes. With this release, users now have the capability to upsert records into serverless indexes as well.
Updated: Datatype of sparse indices from signed 32-bit integers to unsigned 32-bit integers
The expected datatype of sparse indices in Pinecone's backend API is unsigned 32-bit integers
while the spark connector used to accept signed 32-bit integers
. To address the limitations, sparse indices will now accept Long (instead of int), with the input range of [0, 2^32 - 1]. Everything outside of this range will throw an IllegalArgumentException
.
Removed: projectName and environment variables
Users are not required to input projectName
and environment
as input fields while upserting records. The endpoint resolution is handled by the underlying java sdk without the need of both variables. ApiKey and indexName are the only required parameters.
Updated: Pinecone java sdk client v0.7.4 to v1.0.0
Spark-connector relies on pinecone java sdk and as a part of this release, we have updated the java sdk client version from v0.7.4
to v1.0.0
.
Example:
The following example shows how to upsert records into Pinecone using scala-spark:
import io.pinecone.spark.pinecone.{COMMON_SCHEMA, PineconeOptions}
import org.apache.spark.SparkConf
import org.apache.spark.sql.{SaveMode, SparkSession}
object MainApp extends App {
val conf = new SparkConf()
.setMaster("local[*]")
val apiKey = "PINECONE_API_KEY"
val indexName = "PINECONE_INDEX_NAME"
val spark = SparkSession
.builder()
.config(conf)
.getOrCreate()
val df = spark.read
.option("multiLine", value = true)
.option("mode", "PERMISSIVE")
.schema(COMMON_SCHEMA)
.json("path_to_sample.jsonl")
.repartition(2)
val pineconeOptions = Map(
PineconeOptions.PINECONE_API_KEY_CONF -> apiKey,
PineconeOptions.PINECONE_INDEX_NAME_CONF -> indexName
)
df.write
.options(pineconeOptions)
.format("io.pinecone.spark.pinecone.Pinecone")
.mode(SaveMode.Append)
.save()
}
The following example shows how to upsert records into Pinecone using python-spark:
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, ArrayType, FloatType, StringType, LongType
# Your API key, environment, project name, and index name
api_key = "PINECONE_API_KEY"
index_name = "PINECONE_INDEX_NAME"
COMMON_SCHEMA = StructType([
StructField("id", StringType(), False),
StructField("namespace", StringType(), True),
StructField("values", ArrayType(FloatType(), False), False),
StructField("metadata", StringType(), True),
StructField("sparse_values", StructType([
StructField("indices", ArrayType(LongType(), False), False),
StructField("values", ArrayType(FloatType(), False), False)
]), True)
])
# Initialize Spark
spark = SparkSession.builder.getOrCreate()
# Read the file and apply the schema
df = spark.read \
.option("multiLine", value = True) \
.option("mode", "PERMISSIVE") \
.schema(COMMON_SCHEMA) \
.json("/FileStore/tables/sample-4.jsonl")
# Show if the read was successful
df.show()
df.write \
.option("pinecone.apiKey", api_key) \
.option("pinecone.indexName", index_name) \
.format("io.pinecone.spark.pinecone.Pinecone") \
.mode("append") \
.save()
Sample.jsonl file used an input in the above scala-spark and python-spark examples is shown below.
[
{
"id": "v1",
"namespace": "default",
"values": [
1,
2,
3
],
"metadata": {
"hello": [
"world",
"you"
],
"numbers": "or not",
"actual_number": 5.2,
"round": 3
},
"sparse_values": {
"indices": [
0,
2
],
"values": [
5.5,
5
]
}
},
{
"id": "v2",
"values": [
3,
2,
1
]
},
{
"id": "v3",
"values": [
1,
4,
9
],
"namespace": "default"
}
]
Databricks users:
Please import the spark-pinecone assembly jar from S3: s3://pinecone-jars/1.0.0/spark-pinecone-uberjar.jar
.
What's Changed
- Add s3 v0.2.2 assembly jar url by @rohanshah18 in #25
- Update java sdk to v1 to add support for serverless indexes and accept sparse indices within the range of unsigned 32-bit integers by @rohanshah18 in #31
- Update README for v1 by @rohanshah18 in #32
Full Changelog: v0.2.2...v1.0.0
v0.2.2 Release
Updated: Pinecone java sdk client v0.6.0 to v0.7.4
Spark-connector relies on pinecone java sdk and as a part of this release, we have updated the java sdk client version from v0.6.0
to v0.7.4
. Pinecone java sdk client v0.6.0
had index creation issues which are now resolved in v0.7.4
. Even though Spark-connector doesn't offer index creation, users can import pinecone java sdk client in the sbt or gradle project along side the spark connector for additional control and data plane operations.
Databricks users:
Please import the spark-pinecone assembly jar from S3: s3://pinecone-jars/0.2.2/spark-pinecone-uberjar.jar
.
What's Changed
- Update README with s3 path for latest jar and clean up sample.json by @rohanshah18 in #21
- Update pyspark example by @rohanshah18 in #22
- update usage on readme by @rohanshah18 in #23
- Update pinecone-java-sdk version by @rohanshah18 in #24
Full Changelog: v0.2.1...v0.2.2
v0.2.1 Release
Added: Upsert Sparse Vectors
We have added support to insert or update the sparse vectors in the spark-pinecone connector. The basic vector type in Pinecone is a dense vector. Pinecone also supports vectors with sparse and dense values together, which allows users to perform hybrid search on their Pinecone index. Hybrid search combines semantic and keyword search in one query for more relevant results.
Example:
The following example shows how to upsert sparse-dense vectors into Pinecone.
import io.pinecone.spark.pinecone.{COMMON_SCHEMA, PineconeOptions}
import org.apache.spark.SparkConf
import org.apache.spark.sql.{SaveMode, SparkSession}
object MainApp extends App {
val conf = new SparkConf()
.setMaster("local[*]")
val spark = SparkSession
.builder()
.config(conf)
.getOrCreate()
val df = spark.read
.option("multiLine", value = true)
.option("mode", "PERMISSIVE")
.schema(COMMON_SCHEMA)
.json("path_to_sample.jsonl")
.repartition(2)
val pineconeOptions = Map(
PineconeOptions.PINECONE_API_KEY_CONF -> apiKey,
PineconeOptions.PINECONE_ENVIRONMENT_CONF -> environment,
PineconeOptions.PINECONE_PROJECT_NAME_CONF -> projectName,
PineconeOptions.PINECONE_INDEX_NAME_CONF -> indexName
)
df.write
.options(pineconeOptions)
.format("io.pinecone.spark.pinecone.Pinecone")
.mode(SaveMode.Append)
.save()
}
Sample.jsonl file used an input in the above scala code is shown in the next example.
Added: Optional Fields for Input
We've introduced the option to make the namespace
, metadata
, and sparse_values
fields in the input JSON optional with this release. Users can now choose not to include these fields, and the only mandatory fields are id
and values
. Please note that if you include the sparse_values
field in the input, both indices
and values
within sparse_values
must be present. The following example of Sample.json file illustrates possible combinations of input vectors supported by the schema.
[
{
"id": "v1",
"namespace": "default",
"values": [
1,
2,
3
],
"metadata": {
"hello": [
"world",
"you"
],
"numbers": "or not",
"actual_number": 5.2,
"round": 3
},
"sparse_values": {
"indices": [
0,
2
],
"values": [
5.5,
5
]
}
},
{
"id": "v2",
"values": [
3,
2,
1
]
},
{
"id": "v3",
"values": [
1,
4,
9
],
"namespace": "default"
}
]
Databricks users:
Please import the spark-pinecone assembly jar from S3: s3://pinecone-jars/0.2.1/spark-pinecone-uberjar.jar
.
What's Changed
- Reformat sparse vectors by @rohanshah18 in #20
- Add support for adding sparse values by @rohanshah18 in #18
- Update README by @rohanshah18 in #17
Full Changelog: v0.1.4...v0.2.1
v0.1.4 Release
Added: publish assembly jar to maven central
Temporarily, we have added the support to publish assembly jar to maven central but the ideal way to obtain the assembly jar on databricks platform for Scala 2.12 is from s3 bucket: s3://pinecone-jars/spark-pinecone-uberjar.jar
What's Changed
- Publish uber jar to maven central as well by @rohanshah18 in #16
Full Changelog: v0.1.3...v0.1.4
v0.1.3 Release
Fixed: Dependency Cleanup and Compatibility Enhancement
Resolved dependency conflicts by updating the spark dependency to version 3.5.0
and the java sdk dependency to version 0.6.0
, which now internally relies on gRPC v1.58.0
and netty 2.0.61.Final
.
These changes effectively resolve the dependency version conflict faced on sbt projects as well as databricks notebook. Users utilizing the databricks platform are advised to utilize the assembly jar from the S3 location: <s3_link>.
Added: Apache 2.0 license
The pinecone-spark connector was open-source project and we have now added Apache 2.0 license, hence formally updating it's status to open-source.
What's Changed
- Apache 2.0 License by @gdj0nes in #8
- Update spark and java sdk versions to resolve dependency conflicts by @rohanshah18 in #10
Full Changelog: v0.1.1...v0.1.2
v0.1.1
v0.1.0 - Initial release
Initial release of this project, see the README for more details.