Skip to content

Commit 79ae0ad

Browse files
Merge pull request #262 from thingsboard/proxy-protocol
Enable/disable proxy protocol per MQTT listener
2 parents e1238c9 + 90f6f87 commit 79ae0ad

11 files changed

+121
-22
lines changed

application/src/main/java/org/thingsboard/mqtt/broker/server/AbstractMqttChannelInitializer.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@
2929
import org.thingsboard.mqtt.broker.server.ip.ProxyIpAddressHandler;
3030
import org.thingsboard.mqtt.broker.server.traffic.DuplexTrafficHandler;
3131

32+
import java.util.Objects;
33+
3234
@Slf4j
3335
@Getter
3436
public abstract class AbstractMqttChannelInitializer extends ChannelInitializer<SocketChannel> implements MqttChannelInitializer {
@@ -38,7 +40,7 @@ public abstract class AbstractMqttChannelInitializer extends ChannelInitializer<
3840
@Value("${historical-data-report.enabled:true}")
3941
private boolean historicalDataReportEnabled;
4042
@Value("${listener.proxy_enabled:false}")
41-
private boolean proxyProtocolEnabled;
43+
private boolean globalProxyProtocolEnabled;
4244

4345
protected final MqttHandlerFactory handlerFactory;
4446

@@ -50,7 +52,7 @@ public AbstractMqttChannelInitializer(MqttHandlerFactory handlerFactory) {
5052
public void initChannel(SocketChannel ch) {
5153
ChannelPipeline pipeline = ch.pipeline();
5254

53-
if (proxyProtocolEnabled) {
55+
if (isProxyProtocolEnabled()) {
5456
pipeline.addLast("proxy", new HAProxyMessageDecoder());
5557
pipeline.addLast("ipAdrHandler", new ProxyIpAddressHandler());
5658
} else {
@@ -85,4 +87,8 @@ protected void constructWsPipeline(SocketChannel ch) {
8587

8688
}
8789

90+
private boolean isProxyProtocolEnabled() {
91+
return Objects.requireNonNullElseGet(isListenerProxyProtocolEnabled(), () -> globalProxyProtocolEnabled);
92+
}
93+
8894
}

application/src/main/java/org/thingsboard/mqtt/broker/server/MqttChannelInitializer.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,4 +29,5 @@ default SslHandler getSslHandler() {
2929
return null;
3030
}
3131

32+
Boolean isListenerProxyProtocolEnabled();
3233
}

application/src/main/java/org/thingsboard/mqtt/broker/server/tcp/MqttTcpChannelInitializer.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,4 +52,8 @@ public String getChannelInitializerName() {
5252
return BrokerConstants.TCP;
5353
}
5454

55+
@Override
56+
public Boolean isListenerProxyProtocolEnabled() {
57+
return context.getListenerProxyProtocolEnabled();
58+
}
5559
}

application/src/main/java/org/thingsboard/mqtt/broker/server/tcp/MqttTcpServerContext.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,13 @@
2020
import org.springframework.stereotype.Component;
2121

2222
@Component
23+
@Getter
2324
public class MqttTcpServerContext {
2425

25-
@Getter
2626
@Value("${listener.tcp.netty.max_payload_size}")
2727
private int maxPayloadSize;
28+
29+
@Value("${listener.tcp.proxy_enabled:}")
30+
private Boolean listenerProxyProtocolEnabled;
31+
2832
}

application/src/main/java/org/thingsboard/mqtt/broker/server/tls/MqttSslChannelInitializer.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import lombok.Getter;
2121
import lombok.extern.slf4j.Slf4j;
2222
import org.springframework.beans.factory.annotation.Qualifier;
23-
import org.springframework.beans.factory.annotation.Value;
2423
import org.springframework.stereotype.Component;
2524
import org.thingsboard.mqtt.broker.common.data.BrokerConstants;
2625
import org.thingsboard.mqtt.broker.server.AbstractMqttChannelInitializer;
@@ -34,9 +33,6 @@ public class MqttSslChannelInitializer extends AbstractMqttChannelInitializer {
3433

3534
private final MqttSslServerContext context;
3635

37-
@Value("${listener.ssl.config.enabled_cipher_suites}")
38-
private String[] enabledCipherSuites;
39-
4036
public MqttSslChannelInitializer(MqttHandlerFactory handlerFactory, MqttSslServerContext context) {
4137
super(handlerFactory);
4238
this.context = context;
@@ -59,7 +55,11 @@ public String getChannelInitializerName() {
5955

6056
@Override
6157
public SslHandler getSslHandler() {
62-
return context.getSslHandlerProvider().getSslHandler(enabledCipherSuites, handlerFactory.getClientAuthType());
58+
return context.getSslHandlerProvider().getSslHandler(context.getEnabledCipherSuites(), handlerFactory.getClientAuthType());
6359
}
6460

61+
@Override
62+
public Boolean isListenerProxyProtocolEnabled() {
63+
return context.getListenerProxyProtocolEnabled();
64+
}
6565
}

application/src/main/java/org/thingsboard/mqtt/broker/server/tls/MqttSslServerContext.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,4 +29,11 @@ public class MqttSslServerContext {
2929

3030
@Value("${listener.ssl.netty.max_payload_size}")
3131
private int maxPayloadSize;
32+
33+
@Value("${listener.ssl.proxy_enabled:}")
34+
private Boolean listenerProxyProtocolEnabled;
35+
36+
@Value("${listener.ssl.config.enabled_cipher_suites}")
37+
private String[] enabledCipherSuites;
38+
3239
}

application/src/main/java/org/thingsboard/mqtt/broker/server/ws/MqttWsChannelInitializer.java

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import lombok.Getter;
2020
import lombok.extern.slf4j.Slf4j;
2121
import org.springframework.beans.factory.annotation.Qualifier;
22-
import org.springframework.beans.factory.annotation.Value;
2322
import org.springframework.stereotype.Component;
2423
import org.thingsboard.mqtt.broker.common.data.BrokerConstants;
2524
import org.thingsboard.mqtt.broker.server.AbstractMqttWsChannelInitializer;
@@ -31,22 +30,35 @@
3130
@Getter
3231
public class MqttWsChannelInitializer extends AbstractMqttWsChannelInitializer {
3332

34-
@Value("${listener.ws.netty.sub_protocols}")
35-
private String subprotocols;
36-
@Value("${listener.ws.netty.max_payload_size}")
37-
private int maxPayloadSize;
33+
private final MqttWsServerContext context;
3834

39-
public MqttWsChannelInitializer(MqttHandlerFactory handlerFactory) {
35+
public MqttWsChannelInitializer(MqttHandlerFactory handlerFactory, MqttWsServerContext context) {
4036
super(handlerFactory);
37+
this.context = context;
4138
}
4239

4340
@Override
4441
public void initChannel(SocketChannel ch) {
4542
super.initChannel(ch);
4643
}
4744

45+
@Override
46+
public int getMaxPayloadSize() {
47+
return context.getMaxPayloadSize();
48+
}
49+
4850
@Override
4951
public String getChannelInitializerName() {
5052
return BrokerConstants.WS;
5153
}
54+
55+
@Override
56+
public Boolean isListenerProxyProtocolEnabled() {
57+
return context.getListenerProxyProtocolEnabled();
58+
}
59+
60+
@Override
61+
protected String getSubprotocols() {
62+
return context.getSubprotocols();
63+
}
5264
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/**
2+
* Copyright © 2016-2025 The Thingsboard Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.thingsboard.mqtt.broker.server.ws;
17+
18+
import lombok.Getter;
19+
import org.springframework.beans.factory.annotation.Value;
20+
import org.springframework.stereotype.Component;
21+
22+
@Component
23+
@Getter
24+
public class MqttWsServerContext {
25+
26+
@Value("${listener.ws.netty.max_payload_size}")
27+
private int maxPayloadSize;
28+
29+
@Value("${listener.ws.proxy_enabled:}")
30+
private Boolean listenerProxyProtocolEnabled;
31+
32+
@Value("${listener.ws.netty.sub_protocols}")
33+
private String subprotocols;
34+
35+
}

application/src/main/java/org/thingsboard/mqtt/broker/server/wss/MqttWssChannelInitializer.java

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import lombok.Getter;
2121
import lombok.extern.slf4j.Slf4j;
2222
import org.springframework.beans.factory.annotation.Qualifier;
23-
import org.springframework.beans.factory.annotation.Value;
2423
import org.springframework.stereotype.Component;
2524
import org.thingsboard.mqtt.broker.common.data.BrokerConstants;
2625
import org.thingsboard.mqtt.broker.server.AbstractMqttWsChannelInitializer;
@@ -34,11 +33,6 @@ public class MqttWssChannelInitializer extends AbstractMqttWsChannelInitializer
3433

3534
private final MqttWssServerContext context;
3635

37-
@Value("${listener.wss.netty.sub_protocols}")
38-
private String subprotocols;
39-
@Value("${listener.wss.config.enabled_cipher_suites}")
40-
private String[] enabledCipherSuites;
41-
4236
public MqttWssChannelInitializer(MqttHandlerFactory handlerFactory, MqttWssServerContext context) {
4337
super(handlerFactory);
4438
this.context = context;
@@ -61,6 +55,16 @@ public String getChannelInitializerName() {
6155

6256
@Override
6357
public SslHandler getSslHandler() {
64-
return context.getWssHandlerProvider().getSslHandler(enabledCipherSuites, handlerFactory.getClientAuthType());
58+
return context.getWssHandlerProvider().getSslHandler(context.getEnabledCipherSuites(), handlerFactory.getClientAuthType());
59+
}
60+
61+
@Override
62+
public Boolean isListenerProxyProtocolEnabled() {
63+
return context.getListenerProxyProtocolEnabled();
64+
}
65+
66+
@Override
67+
protected String getSubprotocols() {
68+
return context.getSubprotocols();
6569
}
6670
}

application/src/main/java/org/thingsboard/mqtt/broker/server/wss/MqttWssServerContext.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,4 +29,14 @@ public class MqttWssServerContext {
2929

3030
@Value("${listener.wss.netty.max_payload_size}")
3131
private int maxPayloadSize;
32+
33+
@Value("${listener.wss.proxy_enabled:}")
34+
private Boolean listenerProxyProtocolEnabled;
35+
36+
@Value("${listener.wss.netty.sub_protocols}")
37+
private String subprotocols;
38+
39+
@Value("${listener.wss.config.enabled_cipher_suites}")
40+
private String[] enabledCipherSuites;
41+
3242
}

0 commit comments

Comments
 (0)