Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SqsClient: Unexpected error while parsing SQS message next on empty iterator #8

Open
kartthikn opened this issue Nov 18, 2020 · 1 comment

Comments

@kartthikn
Copy link

kartthikn commented Nov 18, 2020

Error while parsing SQS message

The below data has been sent to SQS queue via python boto3 library:

data = {'ticket_id': '3000613'}

# Send message to SQS queue
response = sqs.send_message(
    QueueUrl=queue_url,
    DelaySeconds=2,
    MessageAttributes={},
    MessageBody=json.dumps(data)
)

When we receive in SQS we are getting the following message:

{
   "MessageId":"95a45ab6-8a6e-4403-90e8-e3d0fb132ec3",
   "ReceiptHandle":"AQEBAw2+roaCV6U8PaNCiuV4cBHKfsb+rboAnQTekZa7CStJ5Hdbu9Nbpvvph5OqNtcupXvtuPM7OOTRXmJGrJ26+DIf/vdshZ+HIcCgqhEbaHBR4L3qQ3o+ClpwoNY0VZAB4VFQPD/mrHTUP9nAfYKGNszuU2Q1riRYgc9ClYO5KOcmdo2POWk+lrW5uDIr95lccuOmj+T0OBzy0pPxFquqOpSAbj7XyGEXRIz/ocW3MCP42WaoT4PdJAII0ylx0BYbZC5qWLqkEc1mYudgUhV1dadFM58xb6Gv71WI00V+RvaZwFbL/T19z9KqIu+Z0F7hH/Tpe15xxHpZ5yl6tSi+QAEoGMD6UshjpizspQ08Q98OEDAP0xLk0F99fC88AVcf8kJ11Icbv5raXzXnTikFxA==",
   "MD5OfBody":"ef4ab943de701a13d7e3359d3f19df98",
   "Body":"{\"ticket_id\": \"3000613\"}",
   "Attributes":{
      "SentTimestamp":"1605700176692"
   }
}

The following code was written using scala to receive the message:

 val schema = new StructType()
       .add(StructField("ticket_id", StringType))
   val fileFormat = "json"

   val inputDf = spark
     .readStream
     .format("s3-sqs")
     .schema(schema)
     .option("sqsUrl", queueUrl)
     .option("region", "eu-west-1")
     .option("awsAccessKeyId", "XXXXX")
     .option("awsSecretKey", "XXXXXXX")
     .option("fileFormat", fileFormat)
     .option("sqsFetchIntervalSeconds", "2")
     .option("useInstanceProfileCredentials", "false")
     .option("sqsLongPollingWaitTimeSeconds", "5")
     .option("maxFilesPerTrigger", "50")
     .option("ignoreFileDeletion", "true")
     .load()

   val query = inputDf.writeStream
     .queryName("sqs_records")    // this query name will be the table name
     .outputMode("append")
     .format("memory")
     .option("useInstanceProfileCredentials", "false")
     .option("region", "eu-west-1")
     .option("awsAccessKeyId", "XXXXXX")
     .option("awsSecretKey", "XXXXXXXX")
     .start()

We are not able to receive the message and getting the below error:

20/11/18 12:52:37 WARN SqsClient: Unexpected error while parsing SQS message next on empty iterator

Note: We observed that SqsClient:parseSqsMessages() method always expects to get s3 events-notifications message, else it will throw error.

@abhishekd0907
Copy link
Collaborator

@kartthikn
This library allows Spark SQL Streaming Applications to read files from S3 with optimized listing using SQS with the help of S3 event-based notifications. It is not meant for using SQS as a data source for Spark.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants