Peeking into SQS Queues Locally with LocalStack's SQS Developer Endpoint
Learn how to debug AWS SQS queues locally using LocalStack's SQS Developer Endpoint to inspect delayed messages, in-flight messages, and retry flows without side effects or disrupting queue behavior.

Introduction
Debugging distributed systems involving message queues like AWS Simple Queue Service (SQS) can often feel like working with a black box, especially during local development. You’re left wondering:
- Did the message arrive correctly?
- Is it currently being processed?
- Was a retry scheduled with the right delay?
In AWS, checking the state of an SQS queue without affecting its normal behavior is difficult. That’s where LocalStack helps, with SQS Developer Endpoint that lets you inspect local queues using the SQS-compatible API without manipulating the queue.
This tutorial shows how to build a simple local SQS workflow with LocalStack and use the SQS Developer Endpoint to gain insights during debugging.
Challenges with SQS inspection
In AWS, messages in an SQS queue are viewed using the ReceiveMessage
API, but it actively affects the queue and isn’t meant for passive inspection.
When ReceiveMessage
is called, AWS marks the message as in-flight and hides it for the visibility timeout (default 30s, max 12h). Because of this design:
- You can’t check if specific payloads arrived without starting their visibility timer.
- Delayed or in-flight messages stay hidden.
- You might fetch the same message more than once due to “at-least-once” delivery.
Developers often use awkward workarounds, such as setting VisibilityTimeout=0
or polling logs, which slow down feedback loops.
Inspecting SQS Queues with LocalStack
LocalStack provides a custom HTTP endpoint for inspecting SQS queues without side effects: http://localhost:4566/_aws/sqs/messages
.
It lets you peek into a queue and view all messages, including invisible or delayed ones, without changing their state. Key benefits of this approach are:
- No Side-Effects: Doesn’t alter visibility, metrics, or trigger timeouts like the
ReceiveMessage
API. - Complete View: Query with
ShowInvisible=true
orShowDelayed=true
to see all message states. - AWS-Compatible: Returns AWS-compatible response formats (XML/JSON), which works with existing tools.
Now that our core concepts are clear, let’s build a sample app to show how this feature improves our local development workflows.
Prerequisites
- AWS CLI with
awslocal
wrapper script localstack
CLI- Docker
- Python 3.x &
pip
. - Standard command-line tools:
zip
&curl
Step 1: Project Setup
In this project, we’ll deploy a producer Lambda triggered by a Function URL and a consumer Lambda triggered by SQS. The consumer will simulate transient failures to show how delayed messages can be inspected without side effects.
We’ll deploy these local AWS resources using LocalStack:
- SQS Queue (
JobQueueDemo
): Receives job messages. - Producer Lambda (
src/job_producer.py
): Triggered by a Function URL, acceptsPOST
requests with job details, and sends messages to the SQS queue. - Consumer Lambda (
src/job_consumer.py
): Triggered by the SQS queue, processes jobs based onjobType
(success
,transient_fail
,permanent_fail
), and handles delayed retries for transient failures.
Additionally, we will use a Python script to query the LocalStack SQS developer endpoint (/_aws/sqs/messages
) non-destructively, showing visible, invisible, and delayed messages.
1.1: Setup the project
Create the following directory structure:
localstack-sqs-peek-demo/├── src/│ ├── job_producer.py # Producer Lambda│ ├── job_consumer.py # Consumer Lambda│ └── peek_queue.py # Helper: Peeks into queue└── requirements.txt
1.2: Add the dependencies
Add the necessary Python libraries to requirements.txt
:
boto3requests
Create a virtual environment:
python3 -m venv .venvsource .venv/bin/active
Install the dependencies:
pip install -r requirements.txt
Step 2: Create the Producer Lambda
This Lambda is the system’s entry point, exposed via a Function URL for HTTP access. It receives job requests, formats them, and sends them to the JobQueueDemo
SQS queue.
2.1: Add Imports and Initial Setup
In this step, we:
- Import necessary Python libraries.
- Initialize the Boto3 SQS client, which auto-detects LocalStack’s hostname.
- Retrieve the target SQS queue URL from the
QUEUE_URL
environment variable.
import jsonimport osimport uuidimport datetimeimport boto3
sqs = boto3.client("sqs")QUEUE_URL = os.environ.get("QUEUE_URL")
2.2: Add Lambda Handler Function
The handler
function is the entry point executed when the Lambda Function URL is invoked.
def handler(event, context): """Lambda Function URL Handler.""" if not QUEUE_URL: return {"statusCode": 500, "body": json.dumps({"error": "Producer misconfiguration"})}
try: # Assuming POST request via Function URL body_str = event.get("body", "{}") request_body = json.loads(body_str)
job_type = request_body.get("jobType") payload = request_body.get("payload", {})
allowed_job_types = ["success", "transient_fail", "permanent_fail"] if not job_type or job_type not in allowed_job_types: return {"statusCode": 400, "body": json.dumps({ "error": "Bad Request", "message": f"Missing or invalid 'jobType'. Must be one of: {', '.join(allowed_job_types)}" })}
job_id = str(uuid.uuid4())
# Create the message body to be sent to SQS message_body = { "jobId": job_id, "jobType": job_type, "payload": payload, "submittedAt": datetime.datetime.utcnow().isoformat() + "Z" }
# Send the message to the SQS queue sqs.send_message( QueueUrl=QUEUE_URL, MessageBody=json.dumps(message_body) )
response_payload = {"message": "Job submitted successfully", "jobId": job_id} return {"statusCode": 202, "body": json.dumps(response_payload)} # 202 Accepted except Exception as e: return {"statusCode": 500, "body": json.dumps({"error": "Internal Server Error", "message": str(e)})}
2.3: Package the Producer Lambda
Finally, we package the Lambda function from the project root using:
zip -j job_producer.zip src/job_producer.py
In the fourth step, we’ll deploy the Lambda using awslocal
.
Step 3: Create the Consumer Lambda
This Lambda function is triggered automatically when messages arrive in the JobQueueDemo
SQS queue. It processes each message based on the jobType
specified in the message body.
3.1: Add Imports and Initial Setup
Similar to the Producer Lambda, we import libraries, initialize the Boto3 SQS client, and fetch the QUEUE_URL
for retries. We also define retry constants: MAX_RETRIES
and RETRY_DELAY_SECONDS
.
import jsonimport osimport boto3
sqs = boto3.client("sqs")
# Get Queue URL from environment variable set during Lambda creationQUEUE_URL = os.environ.get("QUEUE_URL")
MAX_RETRIES = 3RETRY_DELAY_SECONDS = 15 # Make delay noticeable for demo
3.2: Create Job Processing Logic
This helper function contains the core logic for handling a single job message based on its jobType
.
def process_job(job_data, message_id): """Processes the job based on its type.""" job_id = job_data.get("jobId", "unknown") job_type = job_data.get("jobType", "success") retry_count = job_data.get("retryCount", 0)
# --- Success Path --- if job_type == "success": return {"status": "success"}
# --- Transient Failure Path (Retry Logic) --- elif job_type == "transient_fail": new_retry_count = retry_count + 1
# Check if max retries have been exceeded if new_retry_count > MAX_RETRIES: return {"status": "failed_max_retries"} # Give up
# --- Schedule Delayed Retry --- job_data["retryCount"] = new_retry_count try: # Send a *new* message back to the queue with a delay sqs.send_message( QueueUrl=QUEUE_URL, MessageBody=json.dumps(job_data), # Include updated retry count DelaySeconds=RETRY_DELAY_SECONDS # The crucial part for our demo! ) return {"status": "retry_scheduled"} except Exception as e: return {"status": "failed_scheduling_retry", "error": str(e)}
# --- Permanent Failure Path --- elif job_type == "permanent_fail": return {"status": "failed_permanently"}
else: return {"status": "success_unknown_type"}
The process_job
function handles jobs by type:
success
returns immediately.transient_fail
incrementsretryCount
and, if withinMAX_RETRIES
, re-queues the message with a delay usingDelaySeconds
.permanent_fail
skips retries and returns failure.
3.4: Add Lambda Handler Function
This function orchestrates the processing of the message batch received from SQS.
def handler(event, context): """Lambda handler triggered by SQS.""" if not QUEUE_URL: # Cannot proceed without it return {"batchItemFailures": [{"itemIdentifier": r.get("messageId")} for r in event.get("Records",[])]}
failed_message_ids = []
for record in event.get("Records", []): message_id = record.get("messageId") try: body_str = record.get("body", "{}") job_data = json.loads(body_str)
# Add original message ID for tracing if it's not already there (from retries) if "originalMessageId" not in job_data: job_data["originalMessageId"] = message_id
processing_result = process_job(job_data, message_id)
# Only report failure if we couldn't even schedule the retry if processing_result.get("status") == "failed_scheduling_retry": failed_message_ids.append({"itemIdentifier": message_id})
except Exception as e: # Catch-all for unexpected errors during processing failed_message_ids.append({"itemIdentifier": message_id})
# --- Report Batch Failures --- # Return the list of message IDs that failed processing to SQS. # SQS will *not* delete these messages; they will become visible again after the visibility timeout. # If this list is empty, SQS deletes all messages in the batch successfully processed. response = {"batchItemFailures": failed_message_ids} return response
The handler loops through each record
, parses the body, and calls process_job
. If retry scheduling fails, it marks the message as failed by collecting its messageId
. It returns a batchItemFailures
list to SQS, only messages in this list are retried, while others are removed from the queue.
3.5: Package the Consumer Lambda
Finally, we package the Lambda function from the project root using:
zip -j job_consumer.zip src/job_consumer.py
This consumer logic, with delayed retries, will help us understand how LocalStack’s SQS Developer Endpoint reveals normally hidden queue behavior.
In the next step, we’ll deploy the Lambda using awslocal
.
Step 4: Deploying Locally with LocalStack
Time to bring this thing to life! We’ll now set up all required resources in LocalStack: create the queue, package and deploy both Lambdas, set up the producer’s Function URL, and link the consumer to the SQS trigger.
4.1: Start LocalStack
Before deploying, make sure LocalStack is up and running:
localstack start # -d to run in detached mode
4.2: Create the SQS Queue
QUEUE_URL=$(awslocal sqs create-queue --queue-name JobQueueDemo --query QueueUrl --output text)echo $QUEUE_URL
The following output would be retrieved:
http://sqs.us-east-1.localhost.localstack.cloud:4566/000000000000/JobQueueDemo
Next, grab the Queue ARN which would be used to create the Lambda Event Source Mapping:
QUEUE_ARN=$(awslocal sqs get-queue-attributes --queue-url "${QUEUE_URL}" --attribute-names QueueArn --query Attributes.QueueArn --output text)echo $QUEUE_ARN
The output would be:
arn:aws:sqs:us-east-1:000000000000:JobQueueDemo
4.3: Deploy the Lambda functions
First, we’ll deploy the Producer Lambda and create its Function URL:
PRODUCER_LAMBDA_ARN=$(awslocal lambda create-function \ --function-name job-producer-demo \ --runtime python3.10 \ --role "arn:aws:iam::000000000000:role/lambda-demo-role" \ --handler job_producer.handler \ --zip-file fileb://job_producer.zip \ --environment "Variables={QUEUE_URL=${QUEUE_URL}}" \ --timeout 15 \ --query FunctionArn --output text)
PRODUCER_FUNCTION_URL=$(awslocal lambda create-function-url-config \ --function-name job-producer-demo \ --auth-type NONE \ --region us-east-1 \ --query 'FunctionUrl' --output text)
echo $PRODUCER_FUNCTION_URL
The following output would be retrieved:
http://9iyye06d0qzr1n9mgm2ukjrif4v6qjea.lambda-url.us-east-1.localhost.localstack.cloud:4566/
The Function URL ID may vary in your setup. We’ll use the
PRODUCER_FUNCTION_URL
variable to reference it in the next steps.
Next, we’ll deploy the Consumer Lambda:
CONSUMER_LAMBDA_ARN=$(awslocal lambda create-function \ --function-name job-consumer-demo \ --runtime python3.10 \ --role "arn:aws:iam::000000000000:role/lambda-demo-role" \ --handler job_consumer.handler \ --zip-file fileb://job_consumer.zip \ --environment "Variables={QUEUE_URL=${QUEUE_URL}}" \ --timeout 60 \ --query FunctionArn --output text)
4.4: Create the Event Source Mapping
Finally, we’ll create the Event Source Mapping for the Consumer Lambda:
awslocal lambda create-event-source-mapping \ --function-name job-consumer-demo \ --event-source-arn ${QUEUE_ARN} \ --batch-size 5
This will connect our local SQS queue to our Lambda function. Messages arriving in our queue would then automatically invoke the Consumer Lambda function.
Now that we’ve configured our infrastructure, it’s time to put the SQS Developer Endpoint to the test.
Step 5: Interacting and Peeking into the SQS queue
Now, let’s run the workflow and use the SQS Internal Developer Endpoint.
5.1: Create a Peek Script
We’ll create a simple Python script using the requests
library to call LocalStack’s SQS developer endpoint with ShowInvisible=true
and ShowDelayed=true
, and print the response.
As an alternative, you can use cURL or Postman, but the script makes it easier to customize and parse results.
import osimport requestsimport json
# Get Queue URL from environment variableQUEUE_URL = os.getenv("QUEUE_URL")LOCALSTACK_ENDPOINT = os.getenv("AWS_ENDPOINT_URL", "http://localhost:4566")
# Construct the developer endpoint URLpeek_endpoint = f"{LOCALSTACK_ENDPOINT}/_aws/sqs/messages"
print(f"Peeking into queue: {QUEUE_URL}")print(f"Using dev endpoint: {peek_endpoint}")
try: r = requests.get( peek_endpoint, params={ "QueueUrl": QUEUE_URL, "ShowInvisible": "true", # Show messages currently in-flight "ShowDelayed": "true" # Show messages sent with DelaySeconds }, headers={"Accept": "application/json"}, timeout=10 ) r.raise_for_status() print(json.dumps(r.json(), indent=2))except Exception as e: print(f"An unexpected error occurred: {e}")
5.2: Export the variables
To use the peek script or invoke the producer, export these variables:
export QUEUE_URL="${QUEUE_URL}"export PRODUCER_FUNCTION_URL="${PRODUCER_FUNCTION_URL}"
5.3: Send the jobs
We’ll use curl
to send POST
requests to the Producer Lambda’s Function URL for three scenarios:
- A successful job with task ID
task-001
. - A transient failure with task ID
task-002
. - Another transient failure with task ID
task-003
.
Let’s send the following requests:
curl -X POST "${PRODUCER_FUNCTION_URL}" \ -H "Content-Type: application/json" \ -d '{ "jobType": "success", "payload": {"id": "task-001"} }'
curl -X POST "${PRODUCER_FUNCTION_URL}" \ -H "Content-Type: application/json" \ -d '{ "jobType": "transient_fail", "payload": {"id": "task-002", "attempt": 1} }'
curl -X POST "${PRODUCER_FUNCTION_URL}" \ -H "Content-Type: application/json" \ -d '{ "jobType": "transient_fail", "payload": {"id": "task-003", "attempt": 1} }'
5.4: Peeking into the Queue
Now, the key part! Use the peek_queue.py
script to inspect the queue state without consuming messages.
python src/peek_queue.py
Remember to run the peek script right after sending the messages! The following output would be observed:
{ "ReceiveMessageResponse": { "ReceiveMessageResult": { "Message": [ { "MessageId": "d7d21e86-0135-432b-9459-4ac53f4cb9fe", // <-- Original message "Body": "{\"jobId\": \"acda8379-a3f7-45cd-8479-9adaa0749f3a\", \"jobType\": \"success\", \"payload\": {\"id\": \"task-001\"}, ...}", "Attribute": [ ..., { "Name": "IsVisible", "Value": "true" }, { "Name": "IsDelayed", "Value": "false" } ] // <-- All visible, none delayed }, { "MessageId": "d3afc10c-f820-4845-aa70-89776224d131", // <-- Original message "Body": "{\"jobId\": \"fd752c6e-9253-404c-b0ff-114a1e341600\", \"jobType\": \"transient_fail\", \"payload\": {\"id\": \"task-002\", ...}, ...}", "Attribute": [ ..., { "Name": "IsVisible", "Value": "true" }, { "Name": "IsDelayed", "Value": "false" } ] // <-- All visible, none delayed }, { "MessageId": "597c3444-2c12-49da-b1db-65ede215976e", // <-- Original message "Body": "{\"jobId\": \"ca1f8a4a-2cf7-49b4-9060-e829ac72dbb8\", \"jobType\": \"transient_fail\", \"payload\": {\"id\": \"task-003\", ...}, ...}", "Attribute": [ ..., { "Name": "IsVisible", "Value": "true" }, { "Name": "IsDelayed", "Value": "false" } ] // <-- All visible, none delayed } ] }, ... }}
All three messages are in the queue with IsVisible: true
and IsDelayed: false
, as expected right after sending. The consumer Lambda hasn’t processed them yet.
5.5: Peeking into the Queue (again!)
Wait about 5-10 seconds for the Consumer Lambda to process the initial batch of messages. Then, re-run the peek script. The following output would be displayed:
{ "ReceiveMessageResponse": { "ReceiveMessageResult": { "Message": [ { "MessageId": "229da923-8738-456f-8c9a-085f19ca2780", // <-- NEW Message ID "Body": "{\"jobId\": \"fd752c6e-9253-404c-b0ff-114a1e341600\", ..., \"originalMessageId\": \"d3afc10c-f820-4845-aa70-89776224d131\", \"retryCount\": 1}", // <-- Note retryCount = 1 "Attribute": [ ..., { "Name": "IsVisible", "Value": "true" }, { "Name": "IsDelayed", "Value": "false" } ] // <-- Retry message now visible }, { "MessageId": "7040380a-8fea-4845-9b43-0bdf1e35af10", // <-- NEW Message ID "Body": "{\"jobId\": \"ca1f8a4a-2cf7-49b4-9060-e829ac72dbb8\", ..., \"originalMessageId\": \"597c3444-2c12-49da-b1db-65ede215976e\", \"retryCount\": 1}", // <-- Note retryCount = 1 "Attribute": [ ..., { "Name": "IsVisible", "Value": "true" }, { "Name": "IsDelayed", "Value": "false" } ] // <-- Retry message now visible } ] }, ... }}
We can observe the following:
- The success message (
task-001
) is gone—processed and deleted by the consumer. - The original
transient_fail
messages are also gone. - Two new messages with
retryCount: 1
have appeared, referencing theoriginalMessageIds
of the failed jobs.
These are retry messages from the consumer Lambda, sent with DelaySeconds=15
. Even if 15 seconds haven’t passed, the LocalStack developer endpoint (/_aws/sqs/messages?ShowDelayed=true
) shows them immediately—unlike a standard ReceiveMessage
call, which wouldn’t show them.
5.5: Peeking into the Queue (once more!)
Wait another 10–15 seconds. The first retry messages (with a 15s delay) should now be visible, reprocessed as transient_fail
, and requeued for another delayed retry. Run the peek script again to observe this.
{ "ReceiveMessageResponse": { "ReceiveMessageResult": { "Message": [ { "MessageId": "d0fc6653-93d0-47b6-8878-03722f0e97e9", // <-- NEW Message ID (3rd retry) "Body": "{\"jobId\": \"fd752c6e-9253-404c-b0ff-114a1e341600\", ..., \"originalMessageId\": \"d3afc10c-f820-4845-aa70-89776224d131\", \"retryCount\": 3}", // <-- Retry count increased to 3 "Attribute": [ ..., { "Name": "IsVisible", "Value": "true" }, { "Name": "IsDelayed", "Value": "false" } ] // <-- Final retry attempt }, { "MessageId": "38dbe562-64a1-48d4-8203-e34a43ab0ea7", // <-- NEW Message ID (3rd retry) "Body": "{\"jobId\": \"ca1f8a4a-2cf7-49b4-9060-e829ac72dbb8\", ..., \"originalMessageId\": \"597c3444-2c12-49da-b1db-65ede215976e\", \"retryCount\": 3}", // <-- Retry count increased to 3 "Attribute": [ ..., { "Name": "IsVisible", "Value": "true" }, { "Name": "IsDelayed", "Value": "false" } ] // <-- Final retry attempt } ] }, ... }}
We can observe the following:
- The messages from the previous peek (with
retryCount: 1
) are gone. - Two new messages are present. Their bodies now show
retryCount: 3
.
This shows the second retry cycle. The consumer processed the retryCount: 1
messages and scheduled these new delayed retries. Again, the peek endpoint shows them immediately.
5.6: Peeking into the Queue (one final time!)
Wait another 10–15 seconds. The second set of retry messages (with retryCount: 3
) should now be visible and processed. Since retryCount = MAX_RETRIES
, the consumer stops retrying. Run the peek script again.
{ "ReceiveMessageResponse": { "ReceiveMessageResult": null, // <-- Queue is empty! Max retries reached, messages processed and deleted "ResponseMetadata": { ... } }}
The queue is now empty. The consumer handled the final messages as failed_max_retries
and returned successfully. SQS deleted them, as no failure was reported.
Summary
We did it! We built a local SQS workflow using LocalStack, deployed producer and consumer Lambdas, and used the SQS developer endpoint to inspect delayed and in-flight messages without disrupting the queue and involving unnecessary guesswork!
This capability lets us trace let us trace transient failures and retries end-to-end, giving us full visibility into the message lifecycle locally.