spotify / scio

@@ -68,6 +68,9 @@
Loading
68 68
69 69
    transform = params.header.fold(transform)(transform.withHeader)
70 70
    transform = params.footer.fold(transform)(transform.withFooter)
71 +
    transform = Option(params.tempDirectory)
72 +
      .map(ScioUtil.toResourceId)
73 +
      .fold(transform)(transform.withTempDirectory)
71 74
72 75
    transform
73 76
  }
@@ -84,6 +87,7 @@
Loading
84 87
    private[scio] val DefaultNumShards = 0
85 88
    private[scio] val DefaultCompression = Compression.UNCOMPRESSED
86 89
    private[scio] val DefaultShardNameTemplate = "/part" + ShardNameTemplate.INDEX_OF_MAX
90 +
    private[scio] val DefaultTempDirectory = null
87 91
  }
88 92
  final case class WriteParam(
89 93
    suffix: String = WriteParam.DefaultSuffix,
@@ -91,7 +95,8 @@
Loading
91 95
    compression: Compression = WriteParam.DefaultCompression,
92 96
    header: Option[String] = WriteParam.DefaultHeader,
93 97
    footer: Option[String] = WriteParam.DefaultFooter,
94 -
    shardNameTemplate: String = WriteParam.DefaultShardNameTemplate
98 +
    shardNameTemplate: String = WriteParam.DefaultShardNameTemplate,
99 +
    tempDirectory: String = WriteParam.DefaultTempDirectory
95 100
  )
96 101
97 102
  private[scio] def textFile(path: String): Iterator[String] = {

@@ -24,6 +24,8 @@
Loading
24 24
import com.spotify.scio.ScioContext
25 25
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions
26 26
import org.apache.beam.sdk.extensions.gcp.util.Transport
27 +
import org.apache.beam.sdk.io.FileSystems
28 +
import org.apache.beam.sdk.io.fs.ResourceId
27 29
import org.apache.beam.sdk.{PipelineResult, PipelineRunner}
28 30
import org.slf4j.LoggerFactory
29 31
@@ -91,4 +93,7 @@
Loading
91 93
    case key: Array[_] => ArraySeq.unsafeWrapArray(key).##
92 94
    case key           => key.##
93 95
  }
96 +
97 +
  def toResourceId(directory: String): ResourceId =
98 +
    FileSystems.matchNewResource(directory, true)
94 99
}

@@ -654,6 +654,9 @@
Loading
654 654
  /** Whether this is a test context. */
655 655
  def isTest: Boolean = testId.isDefined
656 656
657 +
  // The temp location of the job as specified by the `--tempLocation` parameter.
658 +
  def tempLocation: String = this.options.getTempLocation
659 +
657 660
  // =======================================================================
658 661
  // Read operations
659 662
  // =======================================================================

@@ -40,9 +40,10 @@
Loading
40 40
    path: String,
41 41
    numShards: Int,
42 42
    suffix: String,
43 -
    destinationFn: A => String
44 -
  ): FileIO.Write[String, A] =
45 -
    FileIO
43 +
    destinationFn: A => String,
44 +
    tempDirectory: String = null
45 +
  ): FileIO.Write[String, A] = {
46 +
    val transform = FileIO
46 47
      .writeDynamic[String, A]()
47 48
      .to(path)
48 49
      .withNumShards(numShards)
@@ -51,6 +52,9 @@
Loading
51 52
      .withNaming(Functions.serializableFn { destination: String =>
52 53
        FileIO.Write.defaultNaming(s"$destination/part", suffix)
53 54
      })
55 +
56 +
    Option(tempDirectory).fold(transform)(transform.withTempDirectory)
57 +
  }
54 58
}
55 59
56 60
final class DynamicSpecificRecordSCollectionOps[T <: SpecificRecord](
@@ -64,7 +68,8 @@
Loading
64 68
    numShards: Int = 0,
65 69
    suffix: String = ".avro",
66 70
    codec: CodecFactory = CodecFactory.deflateCodec(6),
67 -
    metadata: Map[String, AnyRef] = Map.empty
71 +
    metadata: Map[String, AnyRef] = Map.empty,
72 +
    tempDirectory: String = null
68 73
  )(
69 74
    destinationFn: T => String
70 75
  )(implicit ct: ClassTag[T]): ClosedTap[Nothing] = {
@@ -81,7 +86,7 @@
Loading
81 86
        .withCodec(codec)
82 87
        .withMetadata(nm)
83 88
      val write =
84 -
        writeDynamic(path, numShards, suffix, destinationFn)
89 +
        writeDynamic(path, numShards, suffix, destinationFn, tempDirectory)
85 90
          .via(sink)
86 91
87 92
      self.applyInternal(write)
@@ -106,7 +111,8 @@
Loading
106 111
    numShards: Int = 0,
107 112
    suffix: String = ".avro",
108 113
    codec: CodecFactory = CodecFactory.deflateCodec(6),
109 -
    metadata: Map[String, AnyRef] = Map.empty
114 +
    metadata: Map[String, AnyRef] = Map.empty,
115 +
    tempDirectory: String = null
110 116
  )(
111 117
    destinationFn: T => String
112 118
  ): ClosedTap[Nothing] = {
@@ -128,7 +134,7 @@
Loading
128 134
        .withCodec(codec)
129 135
        .withMetadata(nm)
130 136
      val write =
131 -
        writeDynamic(path, numShards, suffix, destinationFn)
137 +
        writeDynamic(path, numShards, suffix, destinationFn, tempDirectory)
132 138
          .via(sink)
133 139
134 140
      self.applyInternal(write)
@@ -150,7 +156,8 @@
Loading
150 156
    path: String,
151 157
    numShards: Int = 0,
152 158
    suffix: String = ".txt",
153 -
    compression: Compression = Compression.UNCOMPRESSED
159 +
    compression: Compression = Compression.UNCOMPRESSED,
160 +
    tempDirectory: String = null
154 161
  )(destinationFn: String => String)(implicit ct: ClassTag[T]): ClosedTap[Nothing] = {
155 162
    val s = if (classOf[String] isAssignableFrom ct.runtimeClass) {
156 163
      self.asInstanceOf[SCollection[String]]
@@ -162,7 +169,7 @@
Loading
162 169
        "Text file with dynamic destinations cannot be used in a test context"
163 170
      )
164 171
    } else {
165 -
      val write = writeDynamic(path, numShards, suffix, destinationFn)
172 +
      val write = writeDynamic(path, numShards, suffix, destinationFn, tempDirectory)
166 173
        .via(beam.TextIO.sink())
167 174
        .withCompression(compression)
168 175
      s.applyInternal(write)
@@ -181,7 +188,8 @@
Loading
181 188
    numShards: Int = 0,
182 189
    suffix: String = ".protobuf",
183 190
    codec: CodecFactory = CodecFactory.deflateCodec(6),
184 -
    metadata: Map[String, AnyRef] = Map.empty
191 +
    metadata: Map[String, AnyRef] = Map.empty,
192 +
    tempDirectory: String = null
185 193
  )(destinationFn: T => String)(implicit ct: ClassTag[T]): ClosedTap[Nothing] = {
186 194
    val protoCoder = Coder.protoMessageCoder[T]
187 195
    val elemCoder = CoderMaterializer.beam(self.context, protoCoder)
@@ -205,7 +213,7 @@
Loading
205 213
        .withCodec(codec)
206 214
        .withMetadata(nm)
207 215
      val write =
208 -
        writeDynamic(path, numShards, suffix, destinationFn)
216 +
        writeDynamic(path, numShards, suffix, destinationFn, tempDirectory)
209 217
          .via(sink)
210 218
211 219
      self.applyInternal(write)

@@ -44,9 +44,10 @@
Loading
44 44
    schema: Schema,
45 45
    suffix: String = AvroIO.WriteParam.DefaultSuffix,
46 46
    codec: CodecFactory = AvroIO.WriteParam.DefaultCodec,
47 -
    metadata: Map[String, AnyRef] = AvroIO.WriteParam.DefaultMetadata
47 +
    metadata: Map[String, AnyRef] = AvroIO.WriteParam.DefaultMetadata,
48 +
    tempDirectory: String = AvroIO.WriteParam.DefaultTempDirectory
48 49
  ): ClosedTap[GenericRecord] = {
49 -
    val param = AvroIO.WriteParam(numShards, suffix, codec, metadata)
50 +
    val param = AvroIO.WriteParam(numShards, suffix, codec, metadata, tempDirectory)
50 51
    self.write(GenericRecordIO(path, schema))(param)
51 52
  }
52 53
}
@@ -64,9 +65,10 @@
Loading
64 65
    numShards: Int = AvroIO.WriteParam.DefaultNumShards,
65 66
    suffix: String = ".obj",
66 67
    codec: CodecFactory = AvroIO.WriteParam.DefaultCodec,
67 -
    metadata: Map[String, AnyRef] = AvroIO.WriteParam.DefaultMetadata
68 +
    metadata: Map[String, AnyRef] = AvroIO.WriteParam.DefaultMetadata,
69 +
    tempDirectory: String = AvroIO.WriteParam.DefaultTempDirectory
68 70
  )(implicit coder: Coder[T]): ClosedTap[T] = {
69 -
    val param = ObjectFileIO.WriteParam(numShards, suffix, codec, metadata)
71 +
    val param = ObjectFileIO.WriteParam(numShards, suffix, codec, metadata, tempDirectory)
70 72
    self.write(ObjectFileIO(path))(param)
71 73
  }
72 74
}
@@ -83,9 +85,10 @@
Loading
83 85
    numShards: Int = AvroIO.WriteParam.DefaultNumShards,
84 86
    suffix: String = AvroIO.WriteParam.DefaultSuffix,
85 87
    codec: CodecFactory = AvroIO.WriteParam.DefaultCodec,
86 -
    metadata: Map[String, AnyRef] = AvroIO.WriteParam.DefaultMetadata
88 +
    metadata: Map[String, AnyRef] = AvroIO.WriteParam.DefaultMetadata,
89 +
    tempDirectory: String = AvroIO.WriteParam.DefaultTempDirectory
87 90
  )(implicit ct: ClassTag[T], coder: Coder[T]): ClosedTap[T] = {
88 -
    val param = AvroIO.WriteParam(numShards, suffix, codec, metadata)
91 +
    val param = AvroIO.WriteParam(numShards, suffix, codec, metadata, tempDirectory)
89 92
    self.write(SpecificRecordIO[T](path))(param)
90 93
  }
91 94
}
@@ -102,9 +105,10 @@
Loading
102 105
    numShards: Int = AvroIO.WriteParam.DefaultNumShards,
103 106
    suffix: String = AvroIO.WriteParam.DefaultSuffix,
104 107
    codec: CodecFactory = AvroIO.WriteParam.DefaultCodec,
105 -
    metadata: Map[String, AnyRef] = AvroIO.WriteParam.DefaultMetadata
108 +
    metadata: Map[String, AnyRef] = AvroIO.WriteParam.DefaultMetadata,
109 +
    tempDirectory: String = AvroIO.WriteParam.DefaultTempDirectory
106 110
  )(implicit ct: ClassTag[T], tt: TypeTag[T], coder: Coder[T]): ClosedTap[T] = {
107 -
    val param = AvroIO.WriteParam(numShards, suffix, codec, metadata)
111 +
    val param = AvroIO.WriteParam(numShards, suffix, codec, metadata, tempDirectory)
108 112
    self.write(AvroTyped.AvroIO[T](path))(param)
109 113
  }
110 114
}
@@ -122,9 +126,10 @@
Loading
122 126
    numShards: Int = AvroIO.WriteParam.DefaultNumShards,
123 127
    suffix: String = ".protobuf",
124 128
    codec: CodecFactory = AvroIO.WriteParam.DefaultCodec,
125 -
    metadata: Map[String, AnyRef] = AvroIO.WriteParam.DefaultMetadata
129 +
    metadata: Map[String, AnyRef] = AvroIO.WriteParam.DefaultMetadata,
130 +
    tempDirectory: String = AvroIO.WriteParam.DefaultTempDirectory
126 131
  )(implicit ct: ClassTag[T], coder: Coder[T]): ClosedTap[T] = {
127 -
    val param = ProtobufIO.WriteParam(numShards, suffix, codec, metadata)
132 +
    val param = ProtobufIO.WriteParam(numShards, suffix, codec, metadata, tempDirectory)
128 133
    self.write(ProtobufIO[T](path))(param)
129 134
  }
130 135
}

@@ -1503,7 +1503,8 @@
Loading
1503 1503
    compression: Compression = TextIO.WriteParam.DefaultCompression,
1504 1504
    header: Option[String] = TextIO.WriteParam.DefaultHeader,
1505 1505
    footer: Option[String] = TextIO.WriteParam.DefaultFooter,
1506 -
    shardNameTemplate: String = TextIO.WriteParam.DefaultShardNameTemplate
1506 +
    shardNameTemplate: String = TextIO.WriteParam.DefaultShardNameTemplate,
1507 +
    tempDirectory: String = TextIO.WriteParam.DefaultTempDirectory
1507 1508
  )(implicit ct: ClassTag[T]): ClosedTap[String] = {
1508 1509
    val s = if (classOf[String] isAssignableFrom ct.runtimeClass) {
1509 1510
      this.asInstanceOf[SCollection[String]]
@@ -1511,7 +1512,15 @@
Loading
1511 1512
      this.map(_.toString)
1512 1513
    }
1513 1514
    s.write(TextIO(path))(
1514 -
      TextIO.WriteParam(suffix, numShards, compression, header, footer, shardNameTemplate)
1515 +
      TextIO.WriteParam(
1516 +
        suffix,
1517 +
        numShards,
1518 +
        compression,
1519 +
        header,
1520 +
        footer,
1521 +
        shardNameTemplate,
1522 +
        tempDirectory
1523 +
      )
1515 1524
    )
1516 1525
  }
1517 1526
@@ -1529,7 +1538,8 @@
Loading
1529 1538
    footer: Array[Byte] = BinaryIO.WriteParam.DefaultFooter,
1530 1539
    framePrefix: Array[Byte] => Array[Byte] = BinaryIO.WriteParam.DefaultFramePrefix,
1531 1540
    frameSuffix: Array[Byte] => Array[Byte] = BinaryIO.WriteParam.DefaultFrameSuffix,
1532 -
    fileNaming: Option[FileNaming] = BinaryIO.WriteParam.DefaultFileNaming
1541 +
    fileNaming: Option[FileNaming] = BinaryIO.WriteParam.DefaultFileNaming,
1542 +
    tempDirectory: String = BinaryIO.WriteParam.DefaultTempDirectory
1533 1543
  )(implicit ev: T <:< Array[Byte]): ClosedTap[Nothing] =
1534 1544
    this
1535 1545
      .covary_[Array[Byte]]
@@ -1544,7 +1554,8 @@
Loading
1544 1554
            footer,
1545 1555
            framePrefix,
1546 1556
            frameSuffix,
1547 -
            fileNaming
1557 +
            fileNaming,
1558 +
            tempDirectory
1548 1559
          )
1549 1560
      )
1550 1561

@@ -129,14 +129,20 @@
Loading
129 129
    numShards: Int,
130 130
    suffix: String,
131 131
    codec: CodecFactory,
132 -
    metadata: Map[String, AnyRef]
133 -
  ) =
134 -
    write
132 +
    metadata: Map[String, AnyRef],
133 +
    tempDirectory: String
134 +
  ) = {
135 +
    val transform = write
135 136
      .to(ScioUtil.pathWithShards(path))
136 137
      .withNumShards(numShards)
137 138
      .withSuffix(suffix)
138 139
      .withCodec(codec)
139 140
      .withMetadata(metadata.asJava)
141 +
142 +
    Option(tempDirectory)
143 +
      .map(ScioUtil.toResourceId)
144 +
      .fold(transform)(transform.withTempDirectory)
145 +
  }
140 146
}
141 147
142 148
final case class SpecificRecordIO[T <: SpecificRecord: ClassTag: Coder](path: String)
@@ -164,7 +170,15 @@
Loading
164 170
    val cls = ScioUtil.classOf[T]
165 171
    val t = beam.AvroIO.write(cls)
166 172
    data.applyInternal(
167 -
      avroOut(t, path, params.numShards, params.suffix, params.codec, params.metadata)
173 +
      avroOut(
174 +
        t,
175 +
        path,
176 +
        params.numShards,
177 +
        params.suffix,
178 +
        params.codec,
179 +
        params.metadata,
180 +
        params.tempDirectory
181 +
      )
168 182
    )
169 183
    tap(())
170 184
  }
@@ -199,7 +213,15 @@
Loading
199 213
  ): Tap[GenericRecord] = {
200 214
    val t = beam.AvroIO.writeGenericRecords(schema)
201 215
    data.applyInternal(
202 -
      avroOut(t, path, params.numShards, params.suffix, params.codec, params.metadata)
216 +
      avroOut(
217 +
        t,
218 +
        path,
219 +
        params.numShards,
220 +
        params.suffix,
221 +
        params.codec,
222 +
        params.metadata,
223 +
        params.tempDirectory
224 +
      )
203 225
    )
204 226
    tap(())
205 227
  }
@@ -249,13 +271,15 @@
Loading
249 271
    private[avro] val DefaultSuffix = ""
250 272
    private[avro] val DefaultCodec: CodecFactory = CodecFactory.deflateCodec(6)
251 273
    private[avro] val DefaultMetadata: Map[String, AnyRef] = Map.empty
274 +
    private[avro] val DefaultTempDirectory = null
252 275
  }
253 276
254 277
  final case class WriteParam private (
255 278
    numShards: Int = WriteParam.DefaultNumShards,
256 279
    private val _suffix: String = WriteParam.DefaultSuffix,
257 280
    codec: CodecFactory = WriteParam.DefaultCodec,
258 -
    metadata: Map[String, AnyRef] = WriteParam.DefaultMetadata
281 +
    metadata: Map[String, AnyRef] = WriteParam.DefaultMetadata,
282 +
    tempDirectory: String = WriteParam.DefaultTempDirectory
259 283
  ) {
260 284
    val suffix: String = _suffix + ".avro"
261 285
  }
@@ -278,15 +302,21 @@
Loading
278 302
      numShards: Int,
279 303
      suffix: String,
280 304
      codec: CodecFactory,
281 -
      metadata: Map[String, AnyRef]
282 -
    ) =
283 -
      write
305 +
      metadata: Map[String, AnyRef],
306 +
      tempDirectory: String
307 +
    ) = {
308 +
      val transform = write
284 309
        .to(ScioUtil.pathWithShards(path))
285 310
        .withNumShards(numShards)
286 311
        .withSuffix(suffix)
287 312
        .withCodec(codec)
288 313
        .withMetadata(metadata.asJava)
289 314
315 +
      Option(tempDirectory)
316 +
        .map(ScioUtil.toResourceId)
317 +
        .fold(transform)(transform.withTempDirectory)
318 +
    }
319 +
290 320
    /**
291 321
     * Get a typed SCollection from an Avro schema.
292 322
     *
@@ -315,7 +345,15 @@
Loading
315 345
        })
316 346
        .withSchema(avroT.schema)
317 347
      data.applyInternal(
318 -
        typedAvroOut(t, path, params.numShards, params.suffix, params.codec, params.metadata)
348 +
        typedAvroOut(
349 +
          t,
350 +
          path,
351 +
          params.numShards,
352 +
          params.suffix,
353 +
          params.codec,
354 +
          params.metadata,
355 +
          params.tempDirectory
356 +
        )
319 357
      )
320 358
      tap(())
321 359
    }

@@ -61,6 +61,9 @@
Loading
61 61
        .withSuffix(params.suffix)
62 62
    }(transform.withNaming)
63 63
64 +
    transform = Option(params.tempDirectory)
65 +
      .fold(transform)(transform.withTempDirectory)
66 +
64 67
    data.applyInternal(transform)
65 68
    EmptyTap
66 69
  }
@@ -99,6 +102,7 @@
Loading
99 102
    private[scio] val DefaultFooter = Array.emptyByteArray
100 103
    private[scio] val DefaultFramePrefix: Array[Byte] => Array[Byte] = _ => Array.emptyByteArray
101 104
    private[scio] val DefaultFrameSuffix: Array[Byte] => Array[Byte] = _ => Array.emptyByteArray
105 +
    private[scio] val DefaultTempDirectory = null
102 106
  }
103 107
104 108
  final case class WriteParam(
@@ -110,7 +114,8 @@
Loading
110 114
    footer: Array[Byte] = WriteParam.DefaultFooter,
111 115
    framePrefix: Array[Byte] => Array[Byte] = WriteParam.DefaultFramePrefix,
112 116
    frameSuffix: Array[Byte] => Array[Byte] = WriteParam.DefaultFrameSuffix,
113 -
    fileNaming: Option[FileNaming] = WriteParam.DefaultFileNaming
117 +
    fileNaming: Option[FileNaming] = WriteParam.DefaultFileNaming,
118 +
    tempDirectory: String = WriteParam.DefaultTempDirectory
114 119
  )
115 120
116 121
  final private class BytesSink(
1
coverage:
2
  status:
3
    project: no
4
    patch: no
5
    changes: no
Sunburst
The inner-most circle is the entire project, moving away from the center are folders then, finally, a single file. The size and color of each slice is representing the number of statements and the coverage, respectively.
Icicle
The top section represents the entire project. Proceeding with folders and finally individual files. The size and color of each slice is representing the number of statements and the coverage, respectively.
Grid
Each block represents a single file in the project. The size and color of each block is represented by the number of statements and the coverage, respectively.
Loading