Skip to content

[SPARK-38101] execuors fail fetching map statuses with INTERNAL_ERROR_BROADCAST #54723

@EnricoMi

Description

@EnricoMi

Executors may fail when fetching map statuses while map status are modified:

Unable to deserialize broadcasted map statuses for shuffle 1: java.io.IOException: org.apache.spark.SparkException:
[INTERNAL_ERROR_BROADCAST] Failed to get broadcast_3_piece0 of broadcast_3 SQLSTATE: XX000

The issue occurs when

  1. an executor fetches map statuses via getStatuses (to get to know where to read shuffle data from)
  2. those map statuses are too large so the driver wraps them into a broadcast variable
  3. the broadcast variable gets deserialized by the executor (which works) and the value (the map status) is fetched from the driver
  4. in the meantime, on the driver an updateMapOutput occurred, which invalidates the cached broadcast variable and deletes the broadcast variable value
  5. the executor cannot fetch the broadcast variable value and throws an exception

There is no problem with the read and write lock in MapOutputTracker. This issue is outside the scope of these locks.

This can be reproduced:

// serialize map status broadcast variable
val mapStatusesBytes =shuffleStatus.serializedMapStatus(tracker.broadcastManager, tracker.isLocal, 0, sc.getConf)

// update map status, invalidates broadcast variable
shuffleStatus.updateMapOutput(1, BlockManagerId(s"exec-2", "host", 2000))

// give broadcast variable some time to disappear
Thread.sleep(100)

// deserializing the broadcast map statuses fails
MapOutputTracker.deserializeOutputStatuses[MapStatus](mapStatusesBytes, sc.getConf)
full exception
Unable to deserialize broadcasted output statuses
org.apache.spark.SparkException: Unable to deserialize broadcasted output statuses
	at org.apache.spark.MapOutputTracker$.deserializeOutputStatuses(MapOutputTracker.scala:1672)
	at org.apache.spark.MapOutputTrackerSuite.$anonfun$new$49(MapOutputTrackerSuite.scala:778)
	at org.apache.spark.MapOutputTrackerSuite.$anonfun$new$49$adapted(MapOutputTrackerSuite.scala:758)
	at org.apache.spark.LocalSparkContext$.withSpark(LocalSparkContext.scala:65)
	at org.apache.spark.MapOutputTrackerSuite.$anonfun$new$48(MapOutputTrackerSuite.scala:758)
...
Caused by: java.io.IOException: org.apache.spark.SparkException: [INTERNAL_ERROR_BROADCAST] Failed to get broadcast_0_piece0 of broadcast_0 SQLSTATE: XX000
	at org.apache.spark.util.SparkErrorUtils.tryOrIOException(SparkErrorUtils.scala:44)
	at org.apache.spark.util.SparkErrorUtils.tryOrIOException$(SparkErrorUtils.scala:35)
	at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:96)
	at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:255)
	at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:110)
	at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
	at org.apache.spark.MapOutputTracker$.deserializeOutputStatuses(MapOutputTracker.scala:1659)
	... 61 more
Caused by: org.apache.spark.SparkException: [INTERNAL_ERROR_BROADCAST] Failed to get broadcast_0_piece0 of broadcast_0 SQLSTATE: XX000
	at org.apache.spark.SparkException$.internalError(SparkException.scala:104)
	at org.apache.spark.SparkException$.internalError(SparkException.scala:112)
	at org.apache.spark.broadcast.TorrentBroadcast.$anonfun$readBlocks$1(TorrentBroadcast.scala:226)
	at scala.runtime.java8.JFunction1$mcVI$sp.apply(JFunction1$mcVI$sp.scala:18)
	at scala.collection.immutable.Vector.foreach(Vector.scala:2125)
	at org.apache.spark.broadcast.TorrentBroadcast.readBlocks(TorrentBroadcast.scala:195)
	at org.apache.spark.broadcast.TorrentBroadcast.$anonfun$readBroadcastBlock$4(TorrentBroadcast.scala:284)
	at scala.Option.getOrElse(Option.scala:201)
	at org.apache.spark.broadcast.TorrentBroadcast.$anonfun$readBroadcastBlock$2(TorrentBroadcast.scala:260)
	at org.apache.spark.util.KeyLock.withLock(KeyLock.scala:64)
	at org.apache.spark.broadcast.TorrentBroadcast.$anonfun$readBroadcastBlock$1(TorrentBroadcast.scala:255)
	at org.apache.spark.util.SparkErrorUtils.tryOrIOException(SparkErrorUtils.scala:37)
	... 67 more
full unit test
test("SPARK-38101: concurrent updateMapOutput not interfering with getStatuses") {
  val newConf = new SparkConf
  newConf.set(RPC_MESSAGE_MAX_SIZE, 1)
  newConf.set(RPC_ASK_TIMEOUT, "1") // Fail fast
  newConf.set(SHUFFLE_MAPOUTPUT_MIN_SIZE_FOR_BROADCAST, 0L) // Always send broadcast variables

  // needs TorrentBroadcast so need a SparkContext
  withSpark(new SparkContext("local", "MapOutputTrackerSuite", newConf)) {sc =>
    val hostname = "localhost"
    val rpcEnv = createRpcEnv("spark", hostname, 0, new SecurityManager(sc.conf))

    val masterTracker = newTrackerMaster(sc.conf)
    masterTracker.trackerEndpoint = rpcEnv.setupEndpoint(MapOutputTracker.ENDPOINT_NAME,
      new MapOutputTrackerMasterEndpoint(rpcEnv, masterTracker, sc.conf))

    val workerRpcEnv = createRpcEnv("spark-slave", hostname, 0, new SecurityManager(sc.conf))
    val workerTracker = new MapOutputTrackerWorker(sc.conf)
    workerTracker.trackerEndpoint =
      workerRpcEnv.setupEndpointRef(rpcEnv.address, MapOutputTracker.ENDPOINT_NAME)

    masterTracker.registerShuffle(1, 4, 2)

    val bmIdOne = BlockManagerId(s"exec-1", "host", 1000)
    val bmIdTwo = BlockManagerId(s"exec-2", "host", 2000)
    masterTracker.registerMapOutput(1, 0, MapStatus(bmIdOne, Array(1000L, 10000L), mapTaskId = 0))
    masterTracker.registerMapOutput(1, 1, MapStatus(bmIdOne, Array(1000L, 10000L), mapTaskId = 1))
    masterTracker.registerMapOutput(1, 2, MapStatus(bmIdTwo, Array(1000L, 10000L), mapTaskId = 2))
    masterTracker.registerMapOutput(1, 3, MapStatus(bmIdTwo, Array(1000L, 10000L), mapTaskId = 3))

    assert(0 == masterTracker.getNumCachedSerializedBroadcast)
    assert(workerTracker.getMapSizesByExecutorId(1, 0).toSeq.length === 2)
    assert(workerTracker.getMapSizesByExecutorId(1, 1).toSeq.length === 2)
    assert(1 == masterTracker.getNumCachedSerializedBroadcast)
    masterTracker.updateMapOutput(1, 0, bmIdOne)
    assert(0 == masterTracker.getNumCachedSerializedBroadcast)

    // some thread keeps updating the map outputs
    @volatile
    var updaterShutdown = false
    val updater = new Thread(new Runnable {
      override def run(): Unit = {
        while (!updaterShutdown) {
          // map output does not change, but updateMapOutput invalidates cached broadcasts
          masterTracker.updateMapOutput(1, 0, bmIdOne)
          masterTracker.updateMapOutput(1, 1, bmIdOne)
          masterTracker.updateMapOutput(1, 2, bmIdTwo)
          masterTracker.updateMapOutput(1, 3, bmIdTwo)
        }
      }
    })
    updater.start()

    1 to 100 foreach { i =>
      assert(workerTracker.getMapSizesByExecutorId(1, 0).toSeq.length === 2)
      assert(workerTracker.getMapSizesByExecutorId(1, 1).toSeq.length === 2)
      // updating epoch invalidates map status cache in worker, so this always sends requests
      workerTracker.updateEpoch(masterTracker.getEpoch + i)
    }

    // shutdown updater thread
    updaterShutdown = true
    updater.join(1000)
    assert(!updater.isAlive)
  }
}

This occurs in multiple situations:

  • fetch failures invalidate map status caches (reported in SPARK-38101)
  • updates of block locations during shuffle data migration (above unit test)

Ways to mitigate:

  • increase spark.shuffle.mapOutput.minSizeForBroadcast, which reduces chances of broadcast variables (not ideal)
  • delay the deletion of the broadcast variable driver-side (this could accumulate many such broadcast variables if there are many executors decommissioning (calling updateMapOutput) and many executors calling getStatuses creating and destroying lots of cached broadcast variables (default minimum size is 0.5MB)
  • retry fetching map status executor-side until success

This issue has been known since 3.1.2. This exists latest releases and master branch.

https://issues.apache.org/jira/browse/SPARK-38101

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions