spotify / ratatool

@@ -22,7 +22,7 @@
Loading
22 22
23 23
import com.google.common.hash.Hasher
24 24
import com.spotify.ratatool.io.{AvroIO, FileStorage}
25 -
import com.spotify.ratatool.samplers.util.{ByteEncoding, Precision, RawEncoding, SampleDistribution}
25 +
import com.spotify.ratatool.samplers.util._
26 26
import com.spotify.scio.ScioContext
27 27
import com.spotify.scio.io.{ClosedTap, MaterializeTap}
28 28
import org.apache.avro.Schema
@@ -203,6 +203,7 @@
Loading
203 203
                               fields: Seq[String],
204 204
                               fraction: Double,
205 205
                               seed: Option[Int],
206 +
                               hashAlgorithm: HashAlgorithm,
206 207
                               distribution: Option[SampleDistribution],
207 208
                               distributionFields: Seq[String],
208 209
                               precision: Precision,
@@ -221,8 +222,8 @@
Loading
221 222
222 223
      val coll = sc.avroFile(input, schema)
223 224
224 -
      val sampledCollection = sampleAvro(coll, fraction, schema, fields, seed, distribution,
225 -
        distributionFields, precision, maxKeySize, byteEncoding)
225 +
      val sampledCollection = sampleAvro(coll, fraction, schema, fields, seed, hashAlgorithm,
226 +
        distribution, distributionFields, precision, maxKeySize, byteEncoding)
226 227
227 228
      val r = sampledCollection.saveAsAvroFile(output, schema = schema)
228 229
      sc.run().waitUntilDone()

@@ -17,10 +17,7 @@
Loading
17 17
18 18
package com.spotify.ratatool.samplers.util
19 19
20 -
import java.nio.charset.Charset
21 -
22 -
import com.google.common.hash.{Funnel, HashCode, Hasher}
23 -
import com.google.common.io.BaseEncoding
20 +
import com.google.common.hash.{Hasher, Hashing}
24 21
25 22
trait SampleDistribution
26 23
case object StratifiedDistribution extends SampleDistribution
@@ -89,3 +86,33 @@
Loading
89 86
    }
90 87
  }
91 88
}
89 +
90 +
sealed trait HashAlgorithm {
91 +
  def hashFn(seed: Option[Int]): Hasher
92 +
}
93 +
94 +
case object MurmurHash extends HashAlgorithm {
95 +
  def hashFn(seed:Option[Int]): Hasher = {
96 +
    Hashing.murmur3_128(seed.getOrElse(System.currentTimeMillis().toInt)).newHasher()
97 +
  }
98 +
}
99 +
case object FarmHash extends HashAlgorithm {
100 +
  def hashFn(seed: Option[Int]): Hasher = seed match {
101 +
    case Some(s) => Hashing.farmHashFingerprint64().newHasher().putInt(s)
102 +
    case _ => Hashing.farmHashFingerprint64().newHasher()
103 +
  }
104 +
}
105 +
106 +
object HashAlgorithm {
107 +
  def fromString(s: String): HashAlgorithm = {
108 +
    if(s == "murmur") {
109 +
      MurmurHash
110 +
    }
111 +
    else if(s == "farm") {
112 +
      FarmHash
113 +
    }
114 +
    else {
115 +
      throw new IllegalArgumentException(s"Invalid hashing function $s")
116 +
    }
117 +
  }
118 +
}

@@ -49,21 +49,6 @@
Loading
49 49
  private[samplers] val utf8Charset = Charset.forName("UTF-8")
50 50
  private[samplers] val fieldSep = '.'
51 51
52 -
  private[samplers] sealed trait HashAlgorithm {
53 -
    def hashFn(seed: Option[Int]): Hasher
54 -
  }
55 -
  private[samplers] case object MurmurHash extends HashAlgorithm {
56 -
    def hashFn(seed:Option[Int]): Hasher = {
57 -
      Hashing.murmur3_128(seed.getOrElse(System.currentTimeMillis().toInt)).newHasher()
58 -
    }
59 -
  }
60 -
  private[samplers] case object FarmHash extends HashAlgorithm {
61 -
    def hashFn(seed: Option[Int]): Hasher = seed match {
62 -
      case Some(s) => Hashing.farmHashFingerprint64().newHasher().putInt(s)
63 -
      case _ => Hashing.farmHashFingerprint64().newHasher()
64 -
    }
65 -
  }
66 -
67 52
  /**
68 53
    * @param hashAlgorithm either MurmurHash (for backwards compatibility) or FarmHash
69 54
    * @param seed optional start value to ensure the same result every time if same seed passed in
@@ -119,6 +104,7 @@
Loading
119 104
        |  --output=<path>                            Output file path or BigQuery table
120 105
        |  [--fields=<field1,field2,...>]             An optional list of fields to include in hashing for sampling cohort selection
121 106
        |  [--seed=<seed>]                            An optional seed used in hashing for sampling cohort selection
107 +
        |  [--hashAlgorithm=(murmur|farm)]            An optional arg to select the hashing algorithm for sampling cohort selection. Defaults to FarmHash for BigQuery compatibility
122 108
        |  [--distribution=(uniform|stratified)]      An optional arg to sample for a stratified or uniform distribution. Must provide `distributionFields`
123 109
        |  [--distributionFields=<field1,field2,...>] An optional list of fields to sample for distribution. Must provide `distribution`
124 110
        |  [--exact]                                  An optional arg for higher precision distribution sampling.
@@ -175,7 +161,15 @@
Loading
175 161
        1e6.toInt
176 162
      }
177 163
178 -
    val (samplePct, input, output, fields, seed, distribution, distributionFields, exact) = try {
164 +
    val (samplePct,
165 +
    input,
166 +
    output,
167 +
    fields,
168 +
    seed,
169 +
    hashAlgorithm,
170 +
    distribution,
171 +
    distributionFields,
172 +
    exact) = try {
179 173
      val pct = args("sample").toFloat
180 174
      require(pct > 0.0F && pct <= 1.0F)
181 175
      (pct,
@@ -183,9 +177,11 @@
Loading
183 177
        args("output"),
184 178
        args.list("fields"),
185 179
        args.optional("seed"),
180 +
        args.optional("hashAlgorithm").map(HashAlgorithm.fromString).getOrElse(FarmHash),
186 181
        args.optional("distribution").map(SampleDistribution.fromString),
187 182
        args.list("distributionFields"),
188 -
        Precision.fromBoolean(args.boolean("exact", default = false)))
183 +
        Precision.fromBoolean(args.boolean("exact", default = false))
184 +
      )
189 185
    } catch {
190 186
      case e: Throwable =>
191 187
        usage()
@@ -224,6 +220,7 @@
Loading
224 220
        fields,
225 221
        samplePct,
226 222
        seed.map(_.toInt),
223 +
        hashAlgorithm,
227 224
        distribution,
228 225
        distributionFields,
229 226
        exact,
@@ -243,6 +240,7 @@
Loading
243 240
        fields,
244 241
        samplePct,
245 242
        seed.map(_.toInt),
243 +
        hashAlgorithm,
246 244
        distribution,
247 245
        distributionFields,
248 246
        exact,
@@ -263,6 +261,7 @@
Loading
263 261
   * @param fraction The sample rate
264 262
   * @param fields Fields to construct hash over for determinism
265 263
   * @param seed Seed used to salt the deterministic hash
264 +
   * @param hashAlgorithm Hash algorithm, either MurmurHash or FarmHash
266 265
   * @param distribution Desired output sample distribution
267 266
   * @param distributionFields Fields to construct distribution over (strata = set of unique fields)
268 267
   * @param precision Approximate or Exact precision
@@ -278,6 +277,7 @@
Loading
278 277
                                        fraction: Double,
279 278
                                        fields: Seq[String],
280 279
                                        seed: Option[Int],
280 +
                                        hashAlgorithm: HashAlgorithm,
281 281
                                        distribution: Option[SampleDistribution],
282 282
                                        distributionFields: Seq[String],
283 283
                                        precision: Precision,
@@ -292,7 +292,7 @@
Loading
292 292
    : SCollection[(U, (T, Double))] = {
293 293
      s.map { v =>
294 294
        val hasher =
295 -
          ByteHasher.wrap(BigSampler.hashFun(seed = seed), byteEncoding, utf8Charset)
295 +
          ByteHasher.wrap(BigSampler.hashFun(hashAlgorithm, seed), byteEncoding, utf8Charset)
296 296
        val hash = fields.foldLeft(hasher)((h, f) => hashFn(v, f, h)).hash
297 297
        (keyFn(v), (v, boundLong(hash.asLong)))
298 298
      }
@@ -310,7 +310,7 @@
Loading
310 310
      case (Deterministic, None, Approximate) =>
311 311
        s.flatMap { e =>
312 312
          val hasher =
313 -
            ByteHasher.wrap(BigSampler.hashFun(seed = seed), byteEncoding, utf8Charset)
313 +
            ByteHasher.wrap(BigSampler.hashFun(hashAlgorithm, seed), byteEncoding, utf8Charset)
314 314
          val hash = fields.foldLeft(hasher)((h, f) => hashFn(e, f, h)).hash
315 315
          BigSampler.diceElement(e, hash, fraction)
316 316
        }
@@ -318,7 +318,10 @@
Loading
318 318
      case (Deterministic, Some(StratifiedDistribution), Approximate) =>
319 319
        val sampled = s.flatMap { v =>
320 320
          val hasher =
321 -
            ByteHasher.wrap(BigSampler.hashFun(seed = seed), byteEncoding, utf8Charset)
321 +
            ByteHasher.wrap(
322 +
              BigSampler.hashFun(hashAlgorithm, seed),
323 +
              byteEncoding,
324 +
              utf8Charset)
322 325
          val hash = fields.foldLeft(hasher)((h, f) => hashFn(v, f, h)).hash
323 326
          BigSampler.diceElement(v, hash, fraction)
324 327
        }.keyBy(keyFn(_))
@@ -332,7 +335,7 @@
Loading
332 335
        val sampled = s.keyBy(keyFn(_))
333 336
          .hashJoin(probPerKey).flatMap { case (k, (v, prob)) =>
334 337
          val hasher =
335 -
            ByteHasher.wrap(BigSampler.hashFun(seed = seed), byteEncoding, utf8Charset)
338 +
            ByteHasher.wrap(BigSampler.hashFun(hashAlgorithm, seed), byteEncoding, utf8Charset)
336 339
          val hash = fields.foldLeft(hasher)((h, f) => hashFn(v, f, h)).hash
337 340
          BigSampler.diceElement(v, hash, prob).map(e => (k, e))
338 341
        }

@@ -18,15 +18,17 @@
Loading
18 18
package com.spotify.ratatool.samplers
19 19
20 20
import java.util.{List => JList}
21 -
22 21
import com.google.api.services.bigquery.model.{TableFieldSchema, TableReference}
23 22
import com.google.common.hash.Hasher
24 -
import com.spotify.ratatool.samplers.util.{ByteEncoding, Precision, RawEncoding, SampleDistribution}
23 +
import com.spotify.ratatool.samplers.util._
25 24
import com.spotify.scio.ScioContext
26 25
import com.spotify.scio.bigquery.TableRow
27 26
import com.spotify.scio.io.ClosedTap
28 -
import org.apache.beam.sdk.io.gcp.bigquery.{BigQueryIO, BigQueryOptions,
29 -
  PatchedBigQueryServicesImpl}
27 +
import org.apache.beam.sdk.io.gcp.bigquery.{
28 +
  BigQueryIO,
29 +
  BigQueryOptions,
30 +
  PatchedBigQueryServicesImpl
31 +
}
30 32
import org.slf4j.LoggerFactory
31 33
32 34
import scala.jdk.CollectionConverters._
@@ -136,6 +138,7 @@
Loading
136 138
                               fields: List[String],
137 139
                               fraction: Double,
138 140
                               seed: Option[Int],
141 +
                               hashAlgorithm: HashAlgorithm,
139 142
                               distribution: Option[SampleDistribution],
140 143
                               distributionFields: List[String],
141 144
                               precision: Precision,
@@ -156,8 +159,8 @@
Loading
156 159
157 160
      val coll = sc.bigQueryTable(Table.Ref(inputTbl))
158 161
159 -
      val sampledCollection = sampleTableRow(coll, fraction, schema, fields, seed, distribution,
160 -
        distributionFields, precision, sizePerKey, byteEncoding)
162 +
      val sampledCollection = sampleTableRow(coll, fraction, schema, fields, seed, hashAlgorithm,
163 +
        distribution, distributionFields, precision, sizePerKey, byteEncoding)
161 164
162 165
      val r = sampledCollection
163 166
        .saveAsBigQueryTable(Table.Ref(outputTbl), schema, WRITE_EMPTY, CREATE_IF_NEEDED,

@@ -38,6 +38,7 @@
Loading
38 38
   * @param fraction The sample rate
39 39
   * @param fields Fields to construct hash over for determinism
40 40
   * @param seed Seed used to salt the deterministic hash
41 +
   * @param hashAlgorithm Hashing algorithm to use
41 42
   * @param distribution Desired output sample distribution
42 43
   * @param distributionFields Fields to construct distribution over (strata = set of unique fields)
43 44
   * @param precision Approximate or Exact precision
@@ -51,6 +52,7 @@
Loading
51 52
                                                 schema: => Schema,
52 53
                                                 fields: Seq[String] = Seq(),
53 54
                                                 seed: Option[Int] = None,
55 +
                                                 hashAlgorithm: HashAlgorithm = FarmHash,
54 56
                                                 distribution: Option[SampleDistribution] = None,
55 57
                                                 distributionFields: Seq[String] = Seq(),
56 58
                                                 precision: Precision = Approximate,
@@ -60,7 +62,8 @@
Loading
60 62
    val schemaSer = schema.toString(false)
61 63
    @transient lazy val schemaSerDe = new Schema.Parser().parse(schemaSer)
62 64
63 -
    BigSampler.sample(coll, fraction, fields, seed, distribution, distributionFields, precision,
65 +
    BigSampler.sample(coll, fraction, fields, seed, hashAlgorithm, distribution, distributionFields,
66 +
      precision,
64 67
      BigSamplerAvro.hashAvroField(schemaSerDe),
65 68
      BigSamplerAvro.buildKey(schemaSerDe, distributionFields),
66 69
      maxKeySize, byteEncoding)
@@ -72,6 +75,7 @@
Loading
72 75
   * @param fraction The sample rate
73 76
   * @param fields Fields to construct hash over for determinism
74 77
   * @param seed Seed used to salt the deterministic hash
78 +
   * @param hashAlgorithm Hashing algorithm to use
75 79
   * @param distribution Desired output sample distribution
76 80
   * @param distributionFields Fields to construct distribution over (strata = set of unique fields)
77 81
   * @param precision Approximate or Exact precision
@@ -84,6 +88,7 @@
Loading
84 88
                     schema: TableSchema,
85 89
                     fields: Seq[String] = Seq(),
86 90
                     seed: Option[Int] = None,
91 +
                     hashAlgorithm: HashAlgorithm = FarmHash,
87 92
                     distribution: Option[SampleDistribution] = None,
88 93
                     distributionFields: Seq[String] = Seq(),
89 94
                     precision: Precision = Approximate,
@@ -94,7 +99,8 @@
Loading
94 99
    @transient lazy val schemaFields =
95 100
      JsonSerDe.fromJsonString(schemaStr, classOf[TableSchema]).getFields.asScala.toList
96 101
97 -
    BigSampler.sample(coll, fraction, fields, seed, distribution, distributionFields, precision,
102 +
    BigSampler.sample(coll, fraction, fields, seed, hashAlgorithm, distribution, distributionFields,
103 +
      precision,
98 104
      BigSamplerBigQuery.hashTableRow(schemaFields),
99 105
      BigSamplerBigQuery.buildKey(schemaFields, distributionFields),
100 106
      maxKeySize, byteEncoding)
@@ -106,6 +112,7 @@
Loading
106 112
   * @param fraction The sample rate
107 113
   * @param fields Fields to construct hash over for determinism
108 114
   * @param seed Seed used to salt the deterministic hash
115 +
   * @param hashAlgorithm Hashing algorithm to use
109 116
   * @param distribution Desired output sample distribution
110 117
   * @param distributionFields Fields to construct distribution over (strata = set of unique fields)
111 118
   * @param precision Approximate or Exact precision
@@ -118,13 +125,15 @@
Loading
118 125
                                                   fraction: Double,
119 126
                                                   fields: Seq[String] = Seq(),
120 127
                                                   seed: Option[Int] = None,
128 +
                                                   hashAlgorithm: HashAlgorithm = FarmHash,
121 129
                                                   distribution: Option[SampleDistribution]=None,
122 130
                                                   distributionFields: Seq[String] = Seq(),
123 131
                                                   precision: Precision = Approximate,
124 132
                                                   maxKeySize: Int = 1e6.toInt,
125 133
                                                   byteEncoding: ByteEncoding = RawEncoding)
126 134
  : SCollection[T] = {
127 -
    BigSampler.sample(coll, fraction, fields, seed, distribution, distributionFields, precision,
135 +
    BigSampler.sample(coll, fraction, fields, seed, hashAlgorithm, distribution, distributionFields,
136 +
      precision,
128 137
      BigSamplerProto.hashProtobufField,
129 138
      BigSamplerProto.buildKey(distributionFields),
130 139
      maxKeySize, byteEncoding)
6771
ratatoolDiffy
6782
ratatoolSampling
6775
ratatoolScalacheck
6780
ratatoolScalacheck
6774
ratatoolShapeless
6783
ratatoolShapeless
6773
ratatoolExamples
6779
ratatoolDiffy
6778
ratatoolExamples
6784
ratatoolSampling

No yaml found.

Create your codecov.yml to customize your Codecov experience

Sunburst
The inner-most circle is the entire project, moving away from the center are folders then, finally, a single file. The size and color of each slice is representing the number of statements and the coverage, respectively.
Icicle
The top section represents the entire project. Proceeding with folders and finally individual files. The size and color of each slice is representing the number of statements and the coverage, respectively.
Grid
Each block represents a single file in the project. The size and color of each block is represented by the number of statements and the coverage, respectively.
Loading