-
Notifications
You must be signed in to change notification settings - Fork 29.1k
Open
Description
Executors may fail when fetching map statuses while map status are modified:
Unable to deserialize broadcasted map statuses for shuffle 1: java.io.IOException: org.apache.spark.SparkException:
[INTERNAL_ERROR_BROADCAST] Failed to get broadcast_3_piece0 of broadcast_3 SQLSTATE: XX000
The issue occurs when
- an executor fetches map statuses via
getStatuses(to get to know where to read shuffle data from) - those map statuses are too large so the driver wraps them into a broadcast variable
- the broadcast variable gets deserialized by the executor (which works) and the value (the map status) is fetched from the driver
- in the meantime, on the driver an
updateMapOutputoccurred, which invalidates the cached broadcast variable and deletes the broadcast variable value - the executor cannot fetch the broadcast variable value and throws an exception
There is no problem with the read and write lock in MapOutputTracker. This issue is outside the scope of these locks.
This can be reproduced:
// serialize map status broadcast variable
val mapStatusesBytes =shuffleStatus.serializedMapStatus(tracker.broadcastManager, tracker.isLocal, 0, sc.getConf)
// update map status, invalidates broadcast variable
shuffleStatus.updateMapOutput(1, BlockManagerId(s"exec-2", "host", 2000))
// give broadcast variable some time to disappear
Thread.sleep(100)
// deserializing the broadcast map statuses fails
MapOutputTracker.deserializeOutputStatuses[MapStatus](mapStatusesBytes, sc.getConf)full exception
Unable to deserialize broadcasted output statuses
org.apache.spark.SparkException: Unable to deserialize broadcasted output statuses
at org.apache.spark.MapOutputTracker$.deserializeOutputStatuses(MapOutputTracker.scala:1672)
at org.apache.spark.MapOutputTrackerSuite.$anonfun$new$49(MapOutputTrackerSuite.scala:778)
at org.apache.spark.MapOutputTrackerSuite.$anonfun$new$49$adapted(MapOutputTrackerSuite.scala:758)
at org.apache.spark.LocalSparkContext$.withSpark(LocalSparkContext.scala:65)
at org.apache.spark.MapOutputTrackerSuite.$anonfun$new$48(MapOutputTrackerSuite.scala:758)
...
Caused by: java.io.IOException: org.apache.spark.SparkException: [INTERNAL_ERROR_BROADCAST] Failed to get broadcast_0_piece0 of broadcast_0 SQLSTATE: XX000
at org.apache.spark.util.SparkErrorUtils.tryOrIOException(SparkErrorUtils.scala:44)
at org.apache.spark.util.SparkErrorUtils.tryOrIOException$(SparkErrorUtils.scala:35)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:96)
at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:255)
at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:110)
at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
at org.apache.spark.MapOutputTracker$.deserializeOutputStatuses(MapOutputTracker.scala:1659)
... 61 more
Caused by: org.apache.spark.SparkException: [INTERNAL_ERROR_BROADCAST] Failed to get broadcast_0_piece0 of broadcast_0 SQLSTATE: XX000
at org.apache.spark.SparkException$.internalError(SparkException.scala:104)
at org.apache.spark.SparkException$.internalError(SparkException.scala:112)
at org.apache.spark.broadcast.TorrentBroadcast.$anonfun$readBlocks$1(TorrentBroadcast.scala:226)
at scala.runtime.java8.JFunction1$mcVI$sp.apply(JFunction1$mcVI$sp.scala:18)
at scala.collection.immutable.Vector.foreach(Vector.scala:2125)
at org.apache.spark.broadcast.TorrentBroadcast.readBlocks(TorrentBroadcast.scala:195)
at org.apache.spark.broadcast.TorrentBroadcast.$anonfun$readBroadcastBlock$4(TorrentBroadcast.scala:284)
at scala.Option.getOrElse(Option.scala:201)
at org.apache.spark.broadcast.TorrentBroadcast.$anonfun$readBroadcastBlock$2(TorrentBroadcast.scala:260)
at org.apache.spark.util.KeyLock.withLock(KeyLock.scala:64)
at org.apache.spark.broadcast.TorrentBroadcast.$anonfun$readBroadcastBlock$1(TorrentBroadcast.scala:255)
at org.apache.spark.util.SparkErrorUtils.tryOrIOException(SparkErrorUtils.scala:37)
... 67 more
full unit test
test("SPARK-38101: concurrent updateMapOutput not interfering with getStatuses") {
val newConf = new SparkConf
newConf.set(RPC_MESSAGE_MAX_SIZE, 1)
newConf.set(RPC_ASK_TIMEOUT, "1") // Fail fast
newConf.set(SHUFFLE_MAPOUTPUT_MIN_SIZE_FOR_BROADCAST, 0L) // Always send broadcast variables
// needs TorrentBroadcast so need a SparkContext
withSpark(new SparkContext("local", "MapOutputTrackerSuite", newConf)) {sc =>
val hostname = "localhost"
val rpcEnv = createRpcEnv("spark", hostname, 0, new SecurityManager(sc.conf))
val masterTracker = newTrackerMaster(sc.conf)
masterTracker.trackerEndpoint = rpcEnv.setupEndpoint(MapOutputTracker.ENDPOINT_NAME,
new MapOutputTrackerMasterEndpoint(rpcEnv, masterTracker, sc.conf))
val workerRpcEnv = createRpcEnv("spark-slave", hostname, 0, new SecurityManager(sc.conf))
val workerTracker = new MapOutputTrackerWorker(sc.conf)
workerTracker.trackerEndpoint =
workerRpcEnv.setupEndpointRef(rpcEnv.address, MapOutputTracker.ENDPOINT_NAME)
masterTracker.registerShuffle(1, 4, 2)
val bmIdOne = BlockManagerId(s"exec-1", "host", 1000)
val bmIdTwo = BlockManagerId(s"exec-2", "host", 2000)
masterTracker.registerMapOutput(1, 0, MapStatus(bmIdOne, Array(1000L, 10000L), mapTaskId = 0))
masterTracker.registerMapOutput(1, 1, MapStatus(bmIdOne, Array(1000L, 10000L), mapTaskId = 1))
masterTracker.registerMapOutput(1, 2, MapStatus(bmIdTwo, Array(1000L, 10000L), mapTaskId = 2))
masterTracker.registerMapOutput(1, 3, MapStatus(bmIdTwo, Array(1000L, 10000L), mapTaskId = 3))
assert(0 == masterTracker.getNumCachedSerializedBroadcast)
assert(workerTracker.getMapSizesByExecutorId(1, 0).toSeq.length === 2)
assert(workerTracker.getMapSizesByExecutorId(1, 1).toSeq.length === 2)
assert(1 == masterTracker.getNumCachedSerializedBroadcast)
masterTracker.updateMapOutput(1, 0, bmIdOne)
assert(0 == masterTracker.getNumCachedSerializedBroadcast)
// some thread keeps updating the map outputs
@volatile
var updaterShutdown = false
val updater = new Thread(new Runnable {
override def run(): Unit = {
while (!updaterShutdown) {
// map output does not change, but updateMapOutput invalidates cached broadcasts
masterTracker.updateMapOutput(1, 0, bmIdOne)
masterTracker.updateMapOutput(1, 1, bmIdOne)
masterTracker.updateMapOutput(1, 2, bmIdTwo)
masterTracker.updateMapOutput(1, 3, bmIdTwo)
}
}
})
updater.start()
1 to 100 foreach { i =>
assert(workerTracker.getMapSizesByExecutorId(1, 0).toSeq.length === 2)
assert(workerTracker.getMapSizesByExecutorId(1, 1).toSeq.length === 2)
// updating epoch invalidates map status cache in worker, so this always sends requests
workerTracker.updateEpoch(masterTracker.getEpoch + i)
}
// shutdown updater thread
updaterShutdown = true
updater.join(1000)
assert(!updater.isAlive)
}
}This occurs in multiple situations:
- fetch failures invalidate map status caches (reported in SPARK-38101)
- updates of block locations during shuffle data migration (above unit test)
Ways to mitigate:
- increase
spark.shuffle.mapOutput.minSizeForBroadcast, which reduces chances of broadcast variables (not ideal) - delay the deletion of the broadcast variable driver-side (this could accumulate many such broadcast variables if there are many executors decommissioning (calling updateMapOutput) and many executors calling getStatuses creating and destroying lots of cached broadcast variables (default minimum size is 0.5MB)
- retry fetching map status executor-side until success
This issue has been known since 3.1.2. This exists latest releases and master branch.
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
No labels