Skip to content
Open

n/a #418

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ kt_jvm_library(
"//main/com/google/privacy/differentialprivacy/pipelinedp4j/core:dp_functions_params",
"//main/com/google/privacy/differentialprivacy/pipelinedp4j/core:encoders",
"//main/com/google/privacy/differentialprivacy/pipelinedp4j/core:framework_collections",
"//main/com/google/privacy/differentialprivacy/pipelinedp4j/core/budget:budget_allocation_details",
"//main/com/google/privacy/differentialprivacy/pipelinedp4j/core/budget:budget_spec",
"//main/com/google/privacy/differentialprivacy/pipelinedp4j/local:local_collections",
"//main/com/google/privacy/differentialprivacy/pipelinedp4j/local:local_encoders",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -342,15 +342,16 @@ internal constructor(
noiseKind,
) {
override fun run(testMode: TestMode): BeamPCollection<QueryPerGroupResult<GroupKeysT>> {
val beamResult =
(runWithDpEngine(testMode) as BeamFrameworkTable<GroupKeysT, DpAggregates>).data
val dpEngineResult: DpEngineResult<GroupKeysT> = runWithDpEngine(testMode)
val beamAggregationResults =
(dpEngineResult.aggregationResults as BeamFrameworkTable<GroupKeysT, DpAggregates>).data
val coder = QueryPerGroupResultCoder(groupKeyEncoder.coder)
val mapToResultFn =
createConvertDpAggregatesToQueryPerGroupResultFn(
aggregations.outputColumnNamesWithMetricTypes(),
aggregations.outputColumnNameToFeatureIdMap(),
)
return beamResult
return beamAggregationResults
.apply(MapElements.into(coder.encodedTypeDescriptor).via(SerializableFunction(mapToResultFn)))
.setCoder(coder)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,14 +228,15 @@ internal constructor(
noiseKind,
) {
override fun run(testMode: TestMode): Sequence<QueryPerGroupResult<GroupKeysT>> {
val localResult =
(runWithDpEngine(testMode) as LocalFrameworkTable<GroupKeysT, DpAggregates>).data
val dpEngineResult: DpEngineResult<GroupKeysT> = runWithDpEngine(testMode)
val localAggregationResults =
(dpEngineResult.aggregationResults as LocalFrameworkTable<GroupKeysT, DpAggregates>).data
val mapToResultFn =
createConvertDpAggregatesToQueryPerGroupResultFn(
aggregations.outputColumnNamesWithMetricTypes(),
aggregations.outputColumnNameToFeatureIdMap(),
)
return localResult.map(mapToResultFn)
return localAggregationResults.map(mapToResultFn)
}

private fun createConvertDpAggregatesToQueryPerGroupResultFn(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import com.google.privacy.differentialprivacy.pipelinedp4j.core.FrameworkCollect
import com.google.privacy.differentialprivacy.pipelinedp4j.core.FrameworkTable
import com.google.privacy.differentialprivacy.pipelinedp4j.core.MetricType
import com.google.privacy.differentialprivacy.pipelinedp4j.core.SelectPartitionsParams
import com.google.privacy.differentialprivacy.pipelinedp4j.core.budget.BudgetAllocationDetails
import com.google.privacy.differentialprivacy.pipelinedp4j.proto.DpAggregates
import com.google.privacy.differentialprivacy.pipelinedp4j.proto.PerFeature
import com.google.privacy.differentialprivacy.pipelinedp4j.proto.copy
Expand Down Expand Up @@ -71,7 +72,17 @@ protected constructor(
validate()
}

protected fun runWithDpEngine(testMode: TestMode): FrameworkTable<GroupKeysT, DpAggregates> {
/**
* The result of running the DP engine.
*
* Contains the aggregated metrics and budget allocation details.
*/
protected data class DpEngineResult<GroupKeysT>(
val aggregationResults: FrameworkTable<GroupKeysT, DpAggregates>,
val budgetAllocationDetails: List<BudgetAllocationDetails>,
)

protected fun runWithDpEngine(testMode: TestMode): DpEngineResult<GroupKeysT> {
val dpEngine =
DpEngine.create(
encoderFactory,
Expand All @@ -84,13 +95,17 @@ protected constructor(
val extractors =
createDataExtractors(valueExtractor = null, vectorExtractor = null, featureId = null)
val result = dpEngine.selectPartitions(data, createSelectPartitionsParams(), extractors)
dpEngine.done()

return result.mapToTable(
"Add empty DpAggregates",
groupKeyEncoder,
encoderFactory.protos(DpAggregates::class),
{ it to DpAggregates.getDefaultInstance() },
val budgetAllocationDetails = dpEngine.done()

return DpEngineResult(
aggregationResults =
result.mapToTable(
"Add empty DpAggregates",
groupKeyEncoder,
encoderFactory.protos(DpAggregates::class),
{ it to DpAggregates.getDefaultInstance() },
),
budgetAllocationDetails = budgetAllocationDetails,
)
}

Expand Down Expand Up @@ -133,42 +148,48 @@ protected constructor(
aggregateWithDpEngine(dpEngine, featureAggregation, listOf(featureAggregation), partitions)
aggResults.add(result)
}
dpEngine.done()
val budgetAllocationDetails = dpEngine.done()

val featureIdPerRun =
if (valueAndVectorAggs.isEmpty()) {
listOf(null)
} else {
valueAndVectorAggs.map { it.getFeatureId() }
}
return aggResults
.zip(featureIdPerRun)
.map { (table, featureId) ->
table.mapValues("TagWithFeatureId", encoderFactory.protos(DpAggregates::class)) { _, agg ->
if (featureId == null) {
agg
} else {
val perFeature = constructPerFeature(agg, featureId)
dpAggregates {
count = agg.count
privacyIdCount = agg.privacyIdCount
this.perFeature += perFeature
val aggregationResults =
aggResults
.zip(featureIdPerRun)
.map { (table, featureId) ->
table.mapValues("TagWithFeatureId", encoderFactory.protos(DpAggregates::class)) { _, agg
->
if (featureId == null) {
agg
} else {
val perFeature = constructPerFeature(agg, featureId)
dpAggregates {
count = agg.count
privacyIdCount = agg.privacyIdCount
this.perFeature += perFeature
}
}
}
}
}
.reduce {
acc: FrameworkTable<GroupKeysT, DpAggregates>,
table: FrameworkTable<GroupKeysT, DpAggregates> ->
acc.flattenWith("FlattenResultsFromMultipleRuns", table)
}
.groupAndCombineValues("MergeDpAggregates") { acc, dpAggregatesFromSingleRun ->
acc.copy {
count += dpAggregatesFromSingleRun.count
privacyIdCount += dpAggregatesFromSingleRun.privacyIdCount
perFeature += dpAggregatesFromSingleRun.perFeatureList
.reduce {
acc: FrameworkTable<GroupKeysT, DpAggregates>,
table: FrameworkTable<GroupKeysT, DpAggregates> ->
acc.flattenWith("FlattenResultsFromMultipleRuns", table)
}
}
.groupAndCombineValues("MergeDpAggregates") { acc, dpAggregatesFromSingleRun ->
acc.copy {
count += dpAggregatesFromSingleRun.count
privacyIdCount += dpAggregatesFromSingleRun.privacyIdCount
perFeature += dpAggregatesFromSingleRun.perFeatureList
}
}
return DpEngineResult(
aggregationResults = aggregationResults,
budgetAllocationDetails = budgetAllocationDetails,
)
}

private fun validate() {
Expand Down Expand Up @@ -277,8 +298,9 @@ protected constructor(
private fun requireDistinctValueExtractors(aggregationsPerValue: List<ValueAggregations<*>>) {
val valueExtractorCounts = aggregationsPerValue.groupingBy { it.valueExtractor }.eachCount()
val duplicates = valueExtractorCounts.filter { it.value > 1 }.keys
val valueAggregationsWithDuplicates =
aggregationsPerValue.filter { it.valueExtractor in duplicates }
val valueAggregationsWithDuplicates = aggregationsPerValue.filter {
it.valueExtractor in duplicates
}
require(duplicates.isEmpty()) {
"There are the same (object reference equality) value extractors used in different aggregateValue() calls. Please merge them into one call." +
"\nValue aggregations with duplicate value extractors:\n${
Expand All @@ -290,8 +312,9 @@ protected constructor(
private fun requireDistinctVectorExtractors(aggregationsPerVector: List<VectorAggregations<*>>) {
val vectorExtractorCounts = aggregationsPerVector.groupingBy { it.vectorExtractor }.eachCount()
val duplicates = vectorExtractorCounts.filter { it.value > 1 }.keys
val vectorAggregationsWithDuplicates =
aggregationsPerVector.filter { it.vectorExtractor in duplicates }
val vectorAggregationsWithDuplicates = aggregationsPerVector.filter {
it.vectorExtractor in duplicates
}
require(duplicates.isEmpty()) {
"There are the same (object reference equality) vector extractors used in different aggregateVector() calls. Please merge them into one call." +
"\nVector aggregations with duplicate vector extractors:\n${
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -329,8 +329,9 @@ internal constructor(
noiseKind,
) {
override fun run(testMode: TestMode): SparkDataset<QueryPerGroupResult<GroupKeysT>> {
val sparkResult =
(runWithDpEngine(testMode) as SparkFrameworkTable<GroupKeysT, DpAggregates>).data
val dpEngineResult: DpEngineResult<GroupKeysT> = runWithDpEngine(testMode)
val sparkAggregationResults =
(dpEngineResult.aggregationResults as SparkFrameworkTable<GroupKeysT, DpAggregates>).data
@Suppress("UNCHECKED_CAST")
val queryPerGroupResultEncoder =
Encoders.kryo(QueryPerGroupResult::class.java) as Encoder<QueryPerGroupResult<GroupKeysT>>
Expand All @@ -339,7 +340,7 @@ internal constructor(
aggregations.outputColumnNamesWithMetricTypes(),
aggregations.outputColumnNameToFeatureIdMap(),
)
return sparkResult.map(MapFunction(mapToResultFn), queryPerGroupResultEncoder)
return sparkAggregationResults.map(MapFunction(mapToResultFn), queryPerGroupResultEncoder)
}

private fun createConvertDpAggregatesToQueryPerGroupResultFn(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ kt_jvm_library(
":framework_collections",
"//main/com/google/privacy/differentialprivacy/pipelinedp4j/core/budget:allocated_budget",
"//main/com/google/privacy/differentialprivacy/pipelinedp4j/core/budget:budget_accountant",
"//main/com/google/privacy/differentialprivacy/pipelinedp4j/core/budget:budget_allocation_details",
"//main/com/google/privacy/differentialprivacy/pipelinedp4j/core/budget:budget_spec",
"//main/com/google/privacy/differentialprivacy/pipelinedp4j/dplibrary:noise_factories",
"//main/com/google/privacy/differentialprivacy/pipelinedp4j/dplibrary:pre_aggregation_partition_selection_factory",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.google.privacy.differentialprivacy.pipelinedp4j.core

import com.google.errorprone.annotations.CanIgnoreReturnValue
import com.google.privacy.differentialprivacy.Noise
import com.google.privacy.differentialprivacy.pipelinedp4j.core.MetricType.COUNT
import com.google.privacy.differentialprivacy.pipelinedp4j.core.MetricType.MEAN
Expand All @@ -35,6 +36,7 @@ import com.google.privacy.differentialprivacy.pipelinedp4j.core.budget.BudgetAcc
import com.google.privacy.differentialprivacy.pipelinedp4j.core.budget.BudgetAccountantFactory
import com.google.privacy.differentialprivacy.pipelinedp4j.core.budget.BudgetAccountingStrategy
import com.google.privacy.differentialprivacy.pipelinedp4j.core.budget.BudgetAccountingStrategy.NAIVE
import com.google.privacy.differentialprivacy.pipelinedp4j.core.budget.BudgetAllocationDetails
import com.google.privacy.differentialprivacy.pipelinedp4j.core.budget.BudgetPerOpSpec
import com.google.privacy.differentialprivacy.pipelinedp4j.core.budget.BudgetRequest
import com.google.privacy.differentialprivacy.pipelinedp4j.core.budget.RelativeBudgetPerOpSpec
Expand Down Expand Up @@ -204,13 +206,26 @@ internal constructor(
}

/**
* Allocates privacy budgets to the metrics whose computation has been requested by calling
* [aggregate]. This method must be called once per [DpEngine] instance.
* Allocates privacy budgets to privacy-preserving operations in [aggregate] and
* [selectPartitions] calls.
*
* Privacy-preserving operations are various aggregation metrics, like COUNT or SUM, and partition
* selection. There might be multiple privacy-preserving operations in a single [DpEngine]
* instance.
*
* This method must be called once per [DpEngine] instance.
*
* @return a list of [BudgetAllocationDetails] for each privacy-preserving operation. This reports
* the actual budgets used during computation, which may include budgets for operations that
* were not directly requested (e.g., for a MEAN aggregation, budget details for both SUM and
* COUNT will be returned).
* @throws IllegalStateException if [done] has already been called on this instance.
*/
fun done() {
@CanIgnoreReturnValue
fun done(): List<BudgetAllocationDetails> {
throwIfDoneWasCalled()
doneCalled = true
budgetAccountant.allocateBudgets()
return budgetAccountant.allocateBudgets()
}

private fun throwIfDoneWasCalled() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ package(
],
)

kt_jvm_library(
name = "budget_allocation_details",
srcs = ["BudgetAllocationDetails.kt"],
)

kt_jvm_library(
name = "budget_spec",
srcs = ["BudgetSpec.kt"],
Expand All @@ -43,6 +48,8 @@ kt_jvm_library(
srcs = ["BudgetAccountant.kt"],
deps = [
":allocated_budget",
":budget_allocation_details",
":budget_spec",
"@maven//:com_google_errorprone_error_prone_annotations",
],
)
Loading
Loading