Data Engineer¶
Apache Kafka Partition assignments¶

Optimize the performance of the Apache Kafka cluster by smartly assigning Topic partitions to the various brokers. The goal is to minimize the latency between the different brokers (for, for instance, replication or election workloads) while dividing the partitions somewhat evenly over the different brokers.
As a Data Engineer at a software company, you are tasked with optimizing the performance of your Apache Kafka cluster. Kafka topics are used to manage data streams, and each topic is divided into partitions. Efficient partitioning of these topics can significantly reduce latency and improve throughput. Your goal is to determine the optimal number of partitions for each topic and assign these partitions to brokers in a way that balances the load and minimizes data transfer between brokers.
Given the following data about your Kafka cluster and topics, use integer linear programming to find the optimal solution.
Objective:
Maximize throughput by minimizing data transfer between brokers.
You calculate this by for every topic, calculating the average of transfer between all brokers based on the number of partitions they have assigned for that topic.
Constraints:
- Partition Capacity Constraint: The number of partitions assigned to each broker must not exceed the broker's maximum capacity.
- Topic Partition Constraint: The number of partitions assigned to each topic must lie within the specified range. All Topics need to be assigned
- Broker Load Balance Constraint: The total number of partitions assigned to each broker should be as balanced as possible, with no broker deviating more than 20% from the broker average.
- Partition Distribution Constraint: For each topic, no broker can hold more than 40% of the partitions.
Data:
The Brokers can be found in kafka_brokers.csv and has the following columns: Broker_ID,Max_Partitions
The Topics can be found in kafka_topics.csv and has the following columns: Topic_ID,Min_Partitions,Max_Partitions
The Data Transfer rates can be found in kafka_rates.csv and has the following columns: From_Broker,To_Broker,Transfer_Rate
Broker_ID,Max_Partitions
B1,50
B2,60
B3,55
B4,65
Topic_ID,Min_Partitions,Max_Partitions
T1,10,20
T2,15,25
T3,8,18
T4,12,22
From_Broker,To_Broker,Transfer_Rate
B1,B2,5
B1,B3,3
B1,B4,4
B2,B1,5
B2,B3,4
B2,B4,7
B3,B1,3
B3,B2,4
B3,B4,2
B4,B1,4
B4,B2,7
B4,B3,2
🔢 Problem Definition
Given the data on brokers, topics, and transfer rates, we need to define the following optimization problem:
Sets
B: Set of brokers, indexed by \(b\).
T: Set of topics, indexed by \(t\).
Parameters
\(\text{MaxPartitions}_b\): Maximum number of partitions that broker \(b\) can handle.
\(\text{MinPartitions}_t\): Minimum number of partitions for topic \(t\).
\(\text{MaxPartitions}_t\): Maximum number of partitions for topic \(t\).
\(\text{TransferRate}_{b1,b2}\): Transfer rate from broker \(b1\) to broker \(b2\).
Decision Variables
\(x_{t,b}\): Number of partitions of topic \(t\) assigned to broker \(b\).
Objective
Maximize throughput by minimizing data transfer between brokers. This can be achieved by minimizing the total data transfer cost across all brokers:
Constraints
Partition Capacity Constraint: The number of partitions assigned to each broker must not exceed the broker’s maximum capacity.
\[\sum_{t \in T} x_{t,b} \leq \text{MaxPartitions}_b, \quad \forall b \in B\]Topic Partition Constraint: The number of partitions assigned to each topic must lie within the specified range.
\[\text{MinPartitions}_t \leq \sum_{b \in B} x_{t,b} \leq \text{MaxPartitions}_t, \quad \forall t \in T\]Broker Load Balance Constraint: The total number of partitions assigned to each broker should be as balanced as possible, with no broker deviating more than 20% from the broker average.
\[ \begin{align}\begin{aligned}\sum_{t \in T} x_{t,b} \leq 1.2 \cdot \frac{\sum_{t \in T} \sum_{b \in B} x_{t,b}}{|B|}, \quad \forall b \in B\\\sum_{t \in T} x_{t,b} \geq 0.8 \cdot \frac{\sum_{t \in T} \sum_{b \in B} x_{t,b}}{|B|}, \quad \forall b \in B\end{aligned}\end{align} \]Partition Distribution Constraint: For each topic, no broker can hold more than 40% of the partitions.
\[x_{t,b} \leq 0.4 \cdot \sum_{b \in B} x_{t,b}, \quad \forall t \in T, \forall b \in B\]
from gurobipy import Model, GRB, quicksum
import pandas as pd
# Load the data from the uploaded CSV files
brokers_df = pd.read_csv('path_to/kafka_brokers.csv')
topics_df = pd.read_csv('path_to/kafka_topics.csv')
transfer_rates_df = pd.read_csv('path_to/kafka_rates.csv')
# Extract data from dataframes for model input
brokers = brokers_df['Broker_ID'].tolist()
topics = topics_df['Topic_ID'].tolist()
max_partitions_broker = dict(zip(brokers_df['Broker_ID'], brokers_df['Max_Partitions']))
min_partitions_topic = dict(zip(topics_df['Topic_ID'], topics_df['Min_Partitions']))
max_partitions_topic = dict(zip(topics_df['Topic_ID'], topics_df['Max_Partitions']))
transfer_rates = {}
for i, row in transfer_rates_df.iterrows():
transfer_rates[(row['From_Broker'], row['To_Broker'])] = row['Transfer_Rate']
# Adjust the transfer_rates dictionary to handle self-transfers (which should be zero)
for b in brokers:
transfer_rates[(b, b)] = 0 # No transfer cost within the same broker
# Create model
model = Model('Kafka_Partition_Optimization')
# Decision variables: x[t,b] = number of partitions of topic t assigned to broker b
x = model.addVars(topics, brokers, vtype=GRB.INTEGER, name="x")
# Objective: Minimize data transfer between brokers
model.setObjective(
quicksum(transfer_rates[(b1, b2)] * x[t, b1] * x[t, b2] for t in topics for b1 in brokers for b2 in brokers),
GRB.MINIMIZE
)
# Constraint 1: Partition Capacity Constraint
model.addConstrs(
(quicksum(x[t, b] for t in topics) <= max_partitions_broker[b] for b in brokers),
name="PartitionCapacity"
)
# Constraint 2: Topic Partition Constraint
model.addConstrs(
(quicksum(x[t, b] for b in brokers) >= min_partitions_topic[t] for t in topics),
name="MinTopicPartitions"
)
model.addConstrs(
(quicksum(x[t, b] for b in brokers) <= max_partitions_topic[t] for t in topics),
name="MaxTopicPartitions"
)
# Constraint 3: Broker Load Balance Constraint
total_partitions = quicksum(x[t, b] for t in topics for b in brokers)
average_partitions_per_broker = total_partitions / len(brokers)
model.addConstrs(
(quicksum(x[t, b] for t in topics) <= 1.2 * average_partitions_per_broker for b in brokers),
name="LoadBalanceUpper"
)
model.addConstrs(
(quicksum(x[t, b] for t in topics) >= 0.8 * average_partitions_per_broker for b in brokers),
name="LoadBalanceLower"
)
# Constraint 4: Partition Distribution Constraint
model.addConstrs(
(x[t, b] <= 0.4 * quicksum(x[t, b_] for b_ in brokers) for t in topics for b in brokers),
name="PartitionDistribution"
)
# Optimize model
model.optimize()
# Extract the solution
solution = {t: {b: x[t, b].X for b in brokers} for t in topics}
# Display the solution
for t in topics:
print(f"Topic {t}:")
for b in brokers:
print(f" Broker {b}: {solution[t][b]} partitions")
Snowflake Optimizing clustering keys¶
As a Data Engineer at a software company, you are tasked with optimizing the performance of a critical application running on Snowflake. The database handles a large volume of read and write operations, and the current table structures and clustering keys are not optimized, leading to slow query performance and higher costs associated with compute time and storage.
Your goal is to design the most efficient clustering strategy and optimize materialized views to enhance query performance while minimizing the impact on storage and compute costs.
You have a set of queries that are frequently run against the database. Each query has a different frequency and selectivity (the percentage of rows filtered by the query). Optimizing clustering keys and materialized views on the columns used in these queries can significantly improve their performance but will also increase storage usage and maintenance costs.
Objective: Maximize the total query performance improvement. This is calculated taking the sum of chosen Performance_Improvements and dividing that by the total number of queries.
Constraints:
- Compute Maintenance Cost Constraint: The total additional compute cost introduced by the clustering keys and materialized views maintenance should not exceed 20 units.
- Storage Cost Constraint: The total storage used by all optimizations should not exceed 45 GB.
- Query Performance Constraint: The total query performance improvement must be at least 20%.
Data:
The data for this is in the attached csv file with the following columns: Query_ID,Frequency,Selectivity (%),Compute_Maintenance_Cost,Storage_Cost (GB),Performance_Improvement (%)
Query_ID,Frequency,Selectivity (%),Compute_Maintenance_Cost,Storage_Cost (GB),Performance_Improvement (%)
Q1,5000,10,3,10,25
Q2,3000,15,4,8,20
Q3,2000,20,2,5,15
Q4,2000,25,3,6,17
Q5,1000,30,5,12,30
Q6,4000,25,6,15,35
Q7,4000,30,7,16,36
Q8,1500,5,1,3,10
Q9,2500,10,2,7,18
🔢 Problem Definition
We need to optimize the selection of clustering keys and materialized views to maximize the overall query performance improvement, subject to constraints on compute maintenance cost, storage cost, and ensuring a minimum total performance improvement.
Decision Variables
Let \(x_i\) be a binary decision variable for each query \(i\), where:
\(x_i = 1\) if we choose to optimize the query with the given clustering key or materialized view.
\(x_i = 0\) otherwise.
Parameters
\(c_i\): Compute maintenance cost for query \(i\)
\(s_i\): Storage cost for query \(i\)
\(p_i\): Performance improvement for query \(i\)
\(n\): Total number of queries
Objective
Maximize the average performance improvement:
Constraints
Compute Maintenance Cost Constraint: The total compute maintenance cost must not exceed 20 units.
\[\sum_{i=1}^{n} c_i \cdot x_i \leq 20\]Storage Cost Constraint: The total storage used must not exceed 45 GB.
\[\sum_{i=1}^{n} s_i \cdot x_i \leq 45\]Query Performance Constraint: Ensure at least 20% total performance improvement.
\[\sum_{i=1}^{n} p_i \cdot x_i \geq 0.20 \times \sum_{i=1}^{n} p_i\]
from gurobipy import Model, GRB, quicksum
import pandas as pd
# Load the CSV file
file_path = '/mnt/data/snowflake.csv'
data = pd.read_csv(file_path)
# Extract data from the dataframe
n = len(data)
compute_costs = data['Compute_Maintenance_Cost'].tolist()
storage_costs = data['Storage_Cost (GB)'].tolist()
performance_improvements = data['Performance_Improvement (%)'].tolist()
# Initialize the model
model = Model("Snowflake_Optimization")
# Decision variables: x_i = 1 if we optimize the query i, otherwise 0
x = model.addVars(n, vtype=GRB.BINARY, name="x")
# Objective: Maximize the average performance improvement
model.setObjective(quicksum(performance_improvements[i] * x[i] for i in range(n)) / n, GRB.MAXIMIZE)
# Constraints
# Compute Maintenance Cost Constraint
model.addConstr(quicksum(compute_costs[i] * x[i] for i in range(n)) <= 20, "Compute_Cost")
# Storage Cost Constraint
model.addConstr(quicksum(storage_costs[i] * x[i] for i in range(n)) <= 45, "Storage_Cost")
# Query Performance Constraint: Ensure at least 20% total performance improvement
total_performance_improvement = sum(performance_improvements)
model.addConstr(quicksum(performance_improvements[i] * x[i] for i in range(n)) >= 0.20 * total_performance_improvement, "Performance_Improvement")
# Optimize the model
model.optimize()
# Extract the results
selected_queries = [i for i in range(n) if x[i].x > 0.5]
total_performance = sum(performance_improvements[i] for i in selected_queries)
average_performance = total_performance / n
total_compute_cost = sum(compute_costs[i] for i in selected_queries)
total_storage_cost = sum(storage_costs[i] for i in selected_queries)