Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,14 @@ import org.apache.spark.sql.execution.streaming.checkpointing.OffsetSeqMetadata
import org.apache.spark.sql.execution.streaming.operators.stateful.StatefulOperatorsUtils
import org.apache.spark.sql.execution.streaming.operators.stateful.join.StreamingSymmetricHashJoinHelper.{LeftSide, RightSide}
import org.apache.spark.sql.execution.streaming.operators.stateful.join.SymmetricHashJoinStateManager
import org.apache.spark.sql.execution.streaming.operators.stateful.transformwithstate.{TransformWithStateOperatorProperties, TransformWithStateVariableInfo}
import org.apache.spark.sql.execution.streaming.operators.stateful.transformwithstate.{StateStoreColumnFamilySchemaUtils, StateVariableType, TransformWithStateOperatorProperties, TransformWithStateVariableInfo}
import org.apache.spark.sql.execution.streaming.operators.stateful.transformwithstate.timers.TimerStateUtils
import org.apache.spark.sql.execution.streaming.runtime.StreamingCheckpointConstants.DIR_NAME_STATE
import org.apache.spark.sql.execution.streaming.runtime.StreamingQueryCheckpointMetadata
import org.apache.spark.sql.execution.streaming.state.{InMemoryStateSchemaProvider, KeyStateEncoderSpec, NoPrefixKeyStateEncoderSpec, PrefixKeyScanStateEncoderSpec, RocksDBStateStoreProvider, StateSchemaCompatibilityChecker, StateSchemaMetadata, StateSchemaProvider, StateStore, StateStoreColFamilySchema, StateStoreConf, StateStoreId, StateStoreProviderId}
import org.apache.spark.sql.execution.streaming.state.OfflineStateRepartitionErrors
import org.apache.spark.sql.execution.streaming.utils.StreamingUtils
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.DataSourceRegister
import org.apache.spark.sql.streaming.TimeMode
import org.apache.spark.sql.types.StructType
Expand Down Expand Up @@ -75,8 +76,7 @@ class StateDataSource extends TableProvider with DataSourceRegister with Logging
sourceOptions.resolvedCpLocation,
stateConf.providerClass)
}
val stateStoreReaderInfo: StateStoreReaderInfo = getStoreMetadataAndRunChecks(
sourceOptions)
val stateStoreReaderInfo = getStoreMetadataAndRunChecks(sourceOptions)

// The key state encoder spec should be available for all operators except stream-stream joins
val keyStateEncoderSpec = if (stateStoreReaderInfo.keyStateEncoderSpecOpt.isDefined) {
Expand All @@ -90,16 +90,17 @@ class StateDataSource extends TableProvider with DataSourceRegister with Logging
stateStoreReaderInfo.transformWithStateVariableInfoOpt,
stateStoreReaderInfo.stateStoreColFamilySchemaOpt,
stateStoreReaderInfo.stateSchemaProviderOpt,
stateStoreReaderInfo.joinColFamilyOpt)
stateStoreReaderInfo.joinColFamilyOpt,
Option(stateStoreReaderInfo.allColumnFamiliesReaderInfo))
}

override def inferSchema(options: CaseInsensitiveStringMap): StructType = {
val sourceOptions = StateSourceOptions.modifySourceOptions(hadoopConf,
StateSourceOptions.apply(session, hadoopConf, options))

val stateStoreReaderInfo: StateStoreReaderInfo = getStoreMetadataAndRunChecks(
sourceOptions)
val stateStoreReaderInfo = getStoreMetadataAndRunChecks(sourceOptions)
val oldSchemaFilePaths = StateDataSource.getOldSchemaFilePaths(sourceOptions, hadoopConf)
val allCFReaderInfo = stateStoreReaderInfo.allColumnFamiliesReaderInfo

val stateCheckpointLocation = sourceOptions.stateCheckpointLocation
try {
Expand All @@ -119,10 +120,13 @@ class StateDataSource extends TableProvider with DataSourceRegister with Logging
(resultSchema.keySchema, resultSchema.valueSchema)
}

val stateVarInfo = stateStoreReaderInfo.transformWithStateVariableInfoOpt
SchemaUtil.getSourceSchema(sourceOptions, keySchema,
valueSchema,
stateStoreReaderInfo.transformWithStateVariableInfoOpt,
stateStoreReaderInfo.stateStoreColFamilySchemaOpt)
stateVarInfo,
stateStoreReaderInfo.stateStoreColFamilySchemaOpt,
allCFReaderInfo.operatorName,
allCFReaderInfo.stateFormatVersion)
} catch {
case NonFatal(e) =>
throw StateDataSourceErrors.failedToReadStateSchema(sourceOptions, e)
Expand All @@ -131,6 +135,37 @@ class StateDataSource extends TableProvider with DataSourceRegister with Logging

override def supportsExternalMetadata(): Boolean = false

/**
* Return the state format version for SYMMETRIC_HASH_JOIN operators.
* This currently only support join operators because this function is only used by
* PartitionKeyExtractor and PartitionKeyExtractor only needs state format version for
* join operators.
*/
private def getStateFormatVersion(
storeMetadata: Array[StateMetadataTableEntry]): Option[Int] = {
if (storeMetadata.nonEmpty &&
storeMetadata.head.operatorName == StatefulOperatorsUtils.SYMMETRIC_HASH_JOIN_EXEC_OP_NAME) {
Some(session.conf.get(SQLConf.STREAMING_JOIN_STATE_FORMAT_VERSION))
} else {
None
}
}

/**
* Returns true if this is a read-all-column-families request for a stream-stream join
* that uses virtual column families (state format version 3).
*/
private def isReadAllColFamiliesOnJoinV3(
sourceOptions: StateSourceOptions,
storeMetadata: Array[StateMetadataTableEntry]): Boolean = {
sourceOptions.internalOnlyReadAllColumnFamilies &&
storeMetadata.head.operatorName == StatefulOperatorsUtils.SYMMETRIC_HASH_JOIN_EXEC_OP_NAME &&
StreamStreamJoinStateHelper.usesVirtualColumnFamilies(
hadoopConf,
sourceOptions.stateCheckpointLocation.toString,
sourceOptions.operatorId)
}

private def buildStateStoreConf(checkpointLocation: String, batchId: Long): StateStoreConf = {
val offsetLog = new StreamingQueryCheckpointMetadata(session, checkpointLocation).offsetLog
offsetLog.get(batchId) match {
Expand Down Expand Up @@ -177,7 +212,12 @@ class StateDataSource extends TableProvider with DataSourceRegister with Logging

val stateVars = twsOperatorProperties.stateVariables
val stateVarInfo = stateVars.filter(stateVar => stateVar.stateName == stateVarName)
if (stateVarInfo.size != 1) {
// This check is to make sure only one stateVarInfo exists in stateVars.
// We skip this check when testing internal column correctness by querying through spark
// because internal columns (e.g., $ttl_, $min_, $count_) are not part of the user-defined
// state variables and therefore not registered in stateVars.
if (stateVarInfo.size != 1 &&
!StateStoreColumnFamilySchemaUtils.isTestingInternalColFamily(stateVarName)) {
throw StateDataSourceErrors.invalidOptionValue(STATE_VAR_NAME,
s"State variable $stateVarName is not defined for the transformWithState operator.")
}
Expand Down Expand Up @@ -239,17 +279,25 @@ class StateDataSource extends TableProvider with DataSourceRegister with Logging
}
}

private def getStoreMetadataAndRunChecks(sourceOptions: StateSourceOptions):
StateStoreReaderInfo = {
private def getStoreMetadataAndRunChecks(
sourceOptions: StateSourceOptions): StateStoreReaderInfo = {
val storeMetadata = StateDataSource.getStateStoreMetadata(sourceOptions, hadoopConf)
runStateVarChecks(sourceOptions, storeMetadata)
if (!sourceOptions.internalOnlyReadAllColumnFamilies) {
// Skip runStateVarChecks when reading all column families (for repartitioning) because:
// 1. We're not targeting a specific state variable, so stateVarName won't be specified
// 2. The validation logic assumes a single state variable is being queried
// 3. For repartitioning, we need to read all column families without these constraints
runStateVarChecks(sourceOptions, storeMetadata)
}

var keyStateEncoderSpecOpt: Option[KeyStateEncoderSpec] = None
var stateStoreColFamilySchemaOpt: Option[StateStoreColFamilySchema] = None
var transformWithStateVariableInfoOpt: Option[TransformWithStateVariableInfo] = None
var stateSchemaProvider: Option[StateSchemaProvider] = None
var joinColFamilyOpt: Option[String] = None
var timeMode: String = TimeMode.None.toString
var stateStoreColFamilySchemas: Set[StateStoreColFamilySchema] = Set.empty
var stateVariableInfos: List[TransformWithStateVariableInfo] = List.empty

if (sourceOptions.joinSide == JoinSideValues.none) {
var stateVarName = sourceOptions.stateVarName
Expand All @@ -267,14 +315,33 @@ class StateDataSource extends TableProvider with DataSourceRegister with Logging

if (sourceOptions.readRegisteredTimers) {
stateVarName = TimerStateUtils.getTimerStateVarNames(timeMode)._1
} else if (sourceOptions.internalOnlyReadAllColumnFamilies) {
// When reading all column families (for repartitioning) for TWS operator,
// we will just choose a random state as placeholder for default column family,
// because we need to use matching stateVariableInfo and stateStoreColFamilySchemaOpt
// to inferSchema (partitionKey in particular) later
stateVarName = operatorProperties.stateVariables.head.stateName
}

val stateVarInfoList = operatorProperties.stateVariables
if (sourceOptions.internalOnlyReadAllColumnFamilies) {
stateVariableInfos = operatorProperties.stateVariables
}
var stateVarInfoList = operatorProperties.stateVariables
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the same as previous version exception for indentation. We can now assign a transformWithStateVariableInfoOpt because stateVarName will always be a "valid" value after line 323 change

.filter(stateVar => stateVar.stateName == stateVarName)
if (stateVarInfoList.isEmpty &&
StateStoreColumnFamilySchemaUtils.isTestingInternalColFamily(stateVarName)) {
// pass this dummy TWSStateVariableInfo for TWS internal column family during testing,
// because internalColumns are not register in operatorProperties.stateVariables,
// thus stateVarInfoList will be empty.
stateVarInfoList = List(TransformWithStateVariableInfo(
stateVarName, StateVariableType.ValueState, false
))
}
require(stateVarInfoList.size == 1, s"Failed to find unique state variable info " +
s"for state variable $stateVarName in operator ${sourceOptions.operatorId}")
val stateVarInfo = stateVarInfoList.head
transformWithStateVariableInfoOpt = Some(stateVarInfo)

val schemaFilePaths = storeMetadataEntry.stateSchemaFilePaths
val stateSchemaMetadata = StateSchemaMetadata.createStateSchemaMetadata(
sourceOptions.stateCheckpointLocation.toString,
Expand Down Expand Up @@ -305,9 +372,23 @@ class StateDataSource extends TableProvider with DataSourceRegister with Logging
oldSchemaFilePaths = oldSchemaFilePaths)
val stateSchema = manager.readSchemaFile()

if (sourceOptions.internalOnlyReadAllColumnFamilies) {
// Store all column family schemas for multi-CF reading.
// Convert to Set to ensure no duplicates and avoid processing same CF twice.
stateStoreColFamilySchemas = stateSchema.toSet
}
// When reading all column families for Join V3, no specific state variable is targeted,
// so stateVarName defaults to DEFAULT_COL_FAMILY_NAME.
// However, Join V3 does not have a "default" column family. Therefore, we pick the first
// schema as resultSchema which will be used as placeholder schema for default schema
// in StatePartitionAllColumnFamiliesReader
val resultSchema = if (isReadAllColFamiliesOnJoinV3(sourceOptions, storeMetadata)) {
stateSchema.head
} else {
stateSchema.filter(_.colFamilyName == stateVarName).head
}
// Based on the version and read schema, populate the keyStateEncoderSpec used for
// reading the column families
val resultSchema = stateSchema.filter(_.colFamilyName == stateVarName).head
keyStateEncoderSpecOpt = Some(getKeyStateEncoderSpec(resultSchema, storeMetadata))
stateStoreColFamilySchemaOpt = Some(resultSchema)
} catch {
Expand All @@ -316,12 +397,16 @@ class StateDataSource extends TableProvider with DataSourceRegister with Logging
}
}

val operatorName = if (storeMetadata.nonEmpty) storeMetadata.head.operatorName else ""
val stateFormatVersion = getStateFormatVersion(storeMetadata)
StateStoreReaderInfo(
keyStateEncoderSpecOpt,
stateStoreColFamilySchemaOpt,
transformWithStateVariableInfoOpt,
stateSchemaProvider,
joinColFamilyOpt
joinColFamilyOpt,
AllColumnFamiliesReaderInfo(
stateStoreColFamilySchemas, stateVariableInfos, operatorName, stateFormatVersion)
)
}

Expand Down Expand Up @@ -708,7 +793,9 @@ case class StateStoreReaderInfo(
stateStoreColFamilySchemaOpt: Option[StateStoreColFamilySchema],
transformWithStateVariableInfoOpt: Option[TransformWithStateVariableInfo],
stateSchemaProviderOpt: Option[StateSchemaProvider],
joinColFamilyOpt: Option[String] // Only used for join op with state format v3
joinColFamilyOpt: Option[String], // Only used for join op with state format v3
// List of all column family schemas - used when internalOnlyReadAllColumnFamilies=true
allColumnFamiliesReaderInfo: AllColumnFamiliesReaderInfo
)

object StateDataSource {
Expand Down
Loading