|
16 | 16 | use Interop\Queue\Queue;
|
17 | 17 | use Interop\Queue\SubscriptionConsumer;
|
18 | 18 | use Interop\Queue\Topic;
|
19 |
| -use InvalidArgumentException; |
20 | 19 | use RdKafka\Conf;
|
21 | 20 | use RdKafka\KafkaConsumer;
|
22 | 21 | use RdKafka\Producer as VendorProducer;
|
@@ -58,34 +57,6 @@ public function __construct(array $config)
|
58 | 57 | $this->configureSerializer($config);
|
59 | 58 | }
|
60 | 59 |
|
61 |
| - /** |
62 |
| - * @param array $config |
63 |
| - * @return void |
64 |
| - */ |
65 |
| - private function configureSerializer(array $config): void |
66 |
| - { |
67 |
| - if (!isset($config['serializer'])) { |
68 |
| - $this->setSerializer(new JsonSerializer()); |
69 |
| - return; |
70 |
| - } |
71 |
| - |
72 |
| - if (is_string($config['serializer'])) { |
73 |
| - $this->setSerializer(new $config['serializer']()); |
74 |
| - } elseif (is_array($config['serializer']) && isset($config['serializer']['class'])) { |
75 |
| - $serializerClass = $config['serializer']['class']; |
76 |
| - $serializerOptions = $config['serializer']['options'] ?? []; |
77 |
| - if (!empty($serializerOptions)) { |
78 |
| - $this->setSerializer(new $serializerClass($serializerOptions)); |
79 |
| - } else { |
80 |
| - $this->setSerializer(new $serializerClass()); |
81 |
| - } |
82 |
| - } elseif ($config['serializer'] instanceof Serializer) { |
83 |
| - $this->setSerializer($config['serializer']); |
84 |
| - } else { |
85 |
| - throw new InvalidArgumentException('Invalid serializer configuration'); |
86 |
| - } |
87 |
| - } |
88 |
| - |
89 | 60 | /**
|
90 | 61 | * @return RdKafkaMessage
|
91 | 62 | */
|
@@ -208,6 +179,58 @@ public static function getLibrdKafkaVersion(): string
|
208 | 179 | return "$major.$minor.$patch";
|
209 | 180 | }
|
210 | 181 |
|
| 182 | + /** |
| 183 | + * @return void |
| 184 | + * JsonSerializer should be the default fallback if no serializer is specified |
| 185 | + */ |
| 186 | + private function configureSerializer(array $config): void |
| 187 | + { |
| 188 | + if (!isset($config['serializer'])) { |
| 189 | + $this->setSerializer(new JsonSerializer()); |
| 190 | + |
| 191 | + return; |
| 192 | + } |
| 193 | + |
| 194 | + $serializer = $config['serializer']; |
| 195 | + |
| 196 | + if ($serializer instanceof Serializer) { |
| 197 | + $this->setSerializer($serializer); |
| 198 | + |
| 199 | + return; |
| 200 | + } |
| 201 | + |
| 202 | + $serializerClass = $this->resolveSerializerClass($serializer); |
| 203 | + |
| 204 | + if (!class_exists($serializerClass) || !is_a($serializerClass, Serializer::class, true)) { |
| 205 | + throw $this->createInvalidSerializerException($serializerClass); |
| 206 | + } |
| 207 | + |
| 208 | + $serializerOptions = $serializer['options'] ?? []; |
| 209 | + $this->setSerializer(new $serializerClass($serializerOptions)); |
| 210 | + } |
| 211 | + |
| 212 | + private function resolveSerializerClass(mixed $serializer): string |
| 213 | + { |
| 214 | + if (is_string($serializer)) { |
| 215 | + return $serializer; |
| 216 | + } |
| 217 | + |
| 218 | + if (is_array($serializer) && isset($serializer['class'])) { |
| 219 | + return $serializer['class']; |
| 220 | + } |
| 221 | + |
| 222 | + throw $this->createInvalidSerializerException($serializer); |
| 223 | + } |
| 224 | + |
| 225 | + private function createInvalidSerializerException(mixed $value): \InvalidArgumentException |
| 226 | + { |
| 227 | + return new \InvalidArgumentException(sprintf( |
| 228 | + 'Invalid serializer configuration. Expected "serializer" to be a string, an array with a "class" key, or a %s instance. Received %s instead.', |
| 229 | + Serializer::class, |
| 230 | + get_debug_type($value) |
| 231 | + )); |
| 232 | + } |
| 233 | + |
211 | 234 | private function getConf(): Conf
|
212 | 235 | {
|
213 | 236 | if (null === $this->conf) {
|
|
0 commit comments