spotify / scio

@@ -19,10 +19,11 @@
Loading
19 19
20 20
import java.io.StringReader
21 21
import java.util.UUID
22 -
23 22
import com.google.api.client.json.JsonObjectParser
24 23
import com.google.api.client.json.jackson2.JacksonFactory
25 24
import com.google.api.services.bigquery.model.TableSchema
25 +
import com.spotify.scio.bigquery.types.SchemaUtil
26 +
import org.apache.avro.Schema
26 27
27 28
/** Utility for BigQuery data types. */
28 29
object BigQueryUtil {
@@ -36,4 +37,23 @@
Loading
36 37
  /* Generates job ID */
37 38
  def generateJobId(projectId: String): String =
38 39
    projectId + "-" + UUID.randomUUID().toString
40 +
41 +
  /** Validates if all selectedFields are in the set of schemaFieldNames */
42 +
  def validateSelectedFields(
43 +
    tableSpec: String,
44 +
    avroSchema: Schema,
45 +
    selectedFields: List[String]
46 +
  ): Unit = {
47 +
    // check if selected fields are defined in the schema of the table
48 +
    if (selectedFields.nonEmpty) {
49 +
      val schemaFieldNames = SchemaUtil.recordPathPrefixedFieldNames(avroSchema)
50 +
      val notFoundFields = selectedFields.toSet -- schemaFieldNames.toSet
51 +
52 +
      if (notFoundFields.nonEmpty) {
53 +
        throw new IllegalArgumentException(
54 +
          s"Fields '${notFoundFields.mkString(",")}' not found in the '$tableSpec' table"
55 +
        )
56 +
      }
57 +
    }
58 +
  }
39 59
}

@@ -92,6 +92,13 @@
Loading
92 92
    val (table, args, selectedFields, rowRestriction) = extractStorageArgs(c)
93 93
    val tableSpec = BigQueryPartitionUtil.latestTable(bigquery, formatString(table :: args))
94 94
    val avroSchema = bigquery.tables.storageReadSchema(tableSpec, selectedFields, rowRestriction)
95 +
96 +
    BigQueryUtil.validateSelectedFields(
97 +
      tableSpec,
98 +
      avroSchema,
99 +
      selectedFields
100 +
    )
101 +
95 102
    val schema = StorageUtil.toTableSchema(avroSchema)
96 103
    val traits = List(tq"${p(c, SType)}.HasStorageOptions")
97 104
    val overrides = List(

@@ -18,9 +18,12 @@
Loading
18 18
package com.spotify.scio.bigquery.types
19 19
20 20
import java.util.{List => JList}
21 -
22 21
import com.google.api.services.bigquery.model.{TableFieldSchema, TableSchema}
22 +
import org.apache.avro.Schema
23 +
import org.apache.avro.Schema.{Type => AvroSchemaType}
23 24
25 +
import scala.annotation.tailrec
26 +
import scala.collection.immutable.Queue
24 27
import scala.jdk.CollectionConverters._
25 28
26 29
/** Utility for BigQuery schemas. */
@@ -137,4 +140,46 @@
Loading
137 140
    "with",
138 141
    "yield"
139 142
  )
143 +
144 +
  /**
145 +
   * Traverses (using BFS) Avro records tree and returns the set of all path prefixed names of all
146 +
   * fields in all records including names of record fields. E.g. for schema:
147 +
   *
148 +
   * { "type": "record", "fields": [ { "name": "r1", "type": { "type": "record", "fields": [ {
149 +
   * "name": "str", "type": "string" }, { "name": "int", "type": "long" } ] } } ] }
150 +
   *
151 +
   * It would return Set(r1, r1.str, r1.int)
152 +
   */
153 +
  private[scio] def recordPathPrefixedFieldNames(schema: Schema): Set[String] = {
154 +
    @tailrec
155 +
    def go(records: Queue[(Schema, String)], names: Set[String]): Set[String] = {
156 +
      if (records.isEmpty) {
157 +
        names
158 +
      } else {
159 +
        records.dequeueOption match {
160 +
          case Some(((record, prefix), queue)) =>
161 +
            val (newRecords, newNames) = record.getFields.asScala.foldLeft((queue, names)) {
162 +
              case ((q, ns), field) =>
163 +
                val recordSchema = field.schema()
164 +
                val recordPrefix = prefix + field.name()
165 +
                (
166 +
                  if (recordSchema.getType == AvroSchemaType.RECORD)
167 +
                    q.enqueue((recordSchema, recordPrefix + "."))
168 +
                  else q,
169 +
                  ns + recordPrefix
170 +
                )
171 +
            }
172 +
            go(newRecords, newNames)
173 +
          case None => names
174 +
        }
175 +
      }
176 +
    }
177 +
178 +
    if (schema.getType != AvroSchemaType.RECORD) {
179 +
      Set.empty
180 +
    } else {
181 +
      val queue = Queue((schema, ""))
182 +
      go(queue, Set.empty)
183 +
    }
184 +
  }
140 185
}
1
coverage:
2
  status:
3
    project: no
4
    patch: no
5
    changes: no
Sunburst
The inner-most circle is the entire project, moving away from the center are folders then, finally, a single file. The size and color of each slice is representing the number of statements and the coverage, respectively.
Icicle
The top section represents the entire project. Proceeding with folders and finally individual files. The size and color of each slice is representing the number of statements and the coverage, respectively.
Grid
Each block represents a single file in the project. The size and color of each block is represented by the number of statements and the coverage, respectively.
Loading