A Blog

A Blog

Where I write anything about everything every once in a while

25 Aug 2020

Spark, AWS Fargate, and S3

Recently I went down a long and winding road to establish my Spark submit jobs as containers run in AWS’s Fargate container service. While I already had a well establish manual Spark environment, it needed to be automated and running the submit as a Fargate container offered several advantages.

  • Job lifecycle is trivial with a container
  • Automagic logging to Cloudwatch
  • Fargate allows for efficient resources

Basic Details

Using Spark 2.4.6, we having pyspark scripts running that output to S3. To interact with S3, Spark uses Hadoop which you must specify when you submit the job.

The Spark cluster is in standalone mode which means the driver deploy mode is client and not cluster. This has the advantage of the container lifecycle matching the job lifecycle.

The cluster is running on regular EC2 instances.

The Long Road

Having previously worked out how to make the above work, it seemed like moving the submit portion to containers would be straightforward.

Where possible I will include some of the errors one may encounter.

This blog post is as much a reminder for future me as it is for someone walking the same path.

Initial Setup

A submit.sh script is used for executing the submit inside a container. The submit may initially look like this:

spark/bin/spark-submit \
  --packages org.apache.hadoop:hadoop-aws:2.7.3 \
  --master $SPARK_MASTER \
  ./script.py

Dreaded “Initial job has not accepted any resources”

When running the container in Fargate, you will encounter an error and your job will not complete. Logs will indicate the job continually attempts to start but:

TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory

This error is unfortunate and may lead you to diagnose the wrong problem. This typically indicates that the job has resource requirements that exceed the clusters resources.

What happens is that not only does the Spark driver contact the master, but the worker nodes will make connections to the driver. In the event the workers cannot conntect to the driver, you will get the above message.

In other words, the “Initial job has not accepted any resources” is actually a networking issue. To fix this, we need to open the appropriate ports. Our Fargate task needs to specify specific ports to open.

Spark itself will assign random ports, so we must override this functionality. We will arbitrarily chose ports 6001 and 6002 for this use.

It is also necessary to provide the IP address of the container as the Spark driver host. To do this we will use some of the ECS metadata capabilities. The metadata contains other items that may be of interest, the data may be retrieved from ECS_CONTAINER_METADATA_URI.

Task Changes

  "portMappings": [
    {
      "hostPort": 6001,
      "protocol": "tcp",
      "containerPort": 6001
    },
    {
      "hostPort": 6002,
      "protocol": "tcp",
      "containerPort": 6002
    }
  ]

Script Changes

For simplicity, a tool called jq is used for pulling values out of JSON. You could use built in tools like sed and awk, but jq provides the simplest interface. If you use jq, you will need to include it in your container. It may also be necessary to install curl.

CONTAINER_DATA=$(curl ${ECS_CONTAINER_METADATA_URI})
IP_ADDRESS=$(echo $CONTAINER_DATA | jq -rj .Networks[0].IPv4Addresses[0])

spark/bin/spark-submit \
  --packages org.apache.hadoop:hadoop-aws:2.7.3 \
  --conf spark.blockManager.port=6002 \
  --conf spark.driver.port=6001 \
  --conf spark.driver.host=$IP_ADDRESS \
  --master $SPARK_MASTER \
  ./script.py

S3 Access

With the job finally being submitted, our job will now encounter:

Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: Forbidden (Service: Amazon S3; Status Code: 403; Error Code: 403 Forbidden; Request ID: 4924C7412B236F53)

The script cannot write to S3 and is rejected with an Access Denied. This was far and away the toughest nut to crack. While it is possible to include AWS credentials directly in the container, from a security and source control perspective that isn’t recommended. Permissions really need to be driven by IAM.

Fargate doesn’t work the same as EC2 for permissions, and things that previously worked may stop working. It was clear something with the Hadoop S3 interface wasn’t working.

AWS provides credentials via a different metadata URL. From within the container, you can find this data at http://169.254.170.2$AWS_CONTAINER_CREDENTIALS_RELATIVE_URI. The credentials are temprorary credentials, though. This means they include AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, and AWS_SESSION_TOKEN. All three values are required for access, and it includes an expiration.

After some digging, I discovered the AWS SDK withing the Hadoop 2.7.3 build is 4-5 years old and predates Fargate and temporary access tokens.

  • Changing the Hadoop version number in your spark-submit does not work, the version must match was was distributed with Spark
  • Specifying all manner of credential providers does not work.

It turns out the recently released Spark 3.0.0 has a much newer Hadoop build, but upgrading to Spark 3.0.0 isn’t desirable yet.

Roll Your Own Distribution

Spark provides downloads without Hadoop libraries included, which makes it possible to include a different version. For the Spark 2.4 line, we must stick with Hadoop < 3.x.

Hadoop 2.8.5 includes an AWS SDK that can support our temporary access tokens.

Your container will need to include the appropriate version of Spark without Hadoop, as well as Hadoop 2.8.5. Connecting them together is straightforward:

export SPARK_DIST_CLASSPATH=$(hadoop-2.8.5/bin/hadoop classpath)

Use Temporary Credentials

The last step is to fetch the temporary credentials and tell Spark how to use them.

CONTAINER_CREDENTIALS=$(curl http://169.254.170.2$AWS_CONTAINER_CREDENTIALS_RELATIVE_URI)

export AWS_ACCESS_KEY_ID=$(echo $CONTAINER_CREDENTIALS | jq -rj .AccessKeyId)
export AWS_SECRET_ACCESS_KEY=$(echo $CONTAINER_CREDENTIALS | jq -rj .SecretAccessKey)
export AWS_SESSION_TOKEN=$(echo $CONTAINER_CREDENTIALS | jq -rj .Token)

spark/bin/spark-submit \
  --packages org.apache.hadoop:hadoop-aws:2.7.3 \
  --conf spark.hadoop.fs.s3a.aws.credentials.provider=org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider \
  --conf spark.blockManager.port=6002 \
  --conf spark.driver.port=6001 \
  --conf spark.driver.host=$IP_ADDRESS \
  --master $SPARK_MASTER \
  ./script.py