[Kafka][Symfony] Utiliser KAFKA + Symfony pour envoyer les logs dans ELK (EleasticSearch)
Nous allons voir comment configurer les divers outils pour envoyé les logs de symfonys dans ELK en passant par kafka
Installer les extensions permettant de communiquer avec kafka
Dans un premier temps, nous allons installer le nécessaire sur notre poste pour pouvoir communiquer avec kafka.
Avec linux / docker
Dans votre DockerFile ajouter les lignes suivantes.
1 2 3 |
#install rdkafka RUN apt-get install librdkafka-dev -y RUN pecl install rdkafka |
Avec Windows
Avec windows les chose se complique un peut (comme d’habitude).
Téléchagerles DLL de rdkafka correspondant à votre version de php : https://pecl.php.net/package/rdkafka
- Déplacer librdkafka.dll dans ‘C:\Windows\System32’
- Déplacer php_rdkafka.dll dans le dossier ‘ext’ de php et l’activer dans vos php.ini (celui de votre version de php + celui de WAMP si vous l’utiliser) en ajoutant ‘extension=php_rdkafka.dll’
- télécharger https://www.nuget.org/packages/librdkafka.redist/0.11.6-RC2 et déplacer le dans ‘C:\Windows\System32’
installation et configuration du bundle permettant d’envoyer des messages a kafka
Installation
Le bundle permettant la communication avec kafka est le suivant : widicorp/kafka-bundle
Installer le :
1 |
composer require widicorp/kafka-bundle |
Configuration
Puis configurons le bundle pour communique avec notre serveur (fichier config/packages/kafka.yaml). Les seuls paramètres que vous avez à changer dans votre application sont les ‘topics’ vers lesquels vous allez envoyer des messages.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
widicorp_kafka: services_name_prefix: 'kafka' # préfixe utiliser lorsque nous allons appeler le service event_dispatcher: false # on ne déclanche pas d'évènement pour chaque commande producers: # liste des producers Kafka (les producers envoie des messages sur des topics) logs: # nom du producer configuration: queue.buffering.max.ms: 1000 # Maximum time, in milliseconds, for buffering data on the producer queue. 1000ms by default. brokers: - '%env(kafka_broker)%' # liste des brokers kafka, aujourd'hui, nous n'en avons qu'un (172.22.1.48:9092) events_poll_timeout: 2000 # temps entre chaque envoie vers kafka. Symfony enverra les messages en grouppé toute les x millisencode topics: # listes de topics vers lesquel nous allons envoyer des messages mon_site-log: # topic kafka nommée mon_site-log' avec une strategie de partition a 2 strategy_partition: 2 logger: enabled: false # on ne veux pas logguer les infos de kafka via symfony |
Utiliser un ‘producer’
Il est désormais possible de récupérer le ‘producer’ comme un service qui a comme nom ‘kafka.producer.logs’ et de l’utiliser :
1 |
$producer->produce("Mon message (en json le plus souvent)"); |
Automatisation des log Symfony vers kafka
Maintenant que nous savons envoyer des messages vers kafka, nous allons faire en sorte d’envoyer automatiquement les messages de logs de niveau erreur au minimum, vers kafka.
configuration de monolog
Nous allons ajouter un ‘handler’ dans monolog qui va transmettre les log à un service que nous allons créer ensuite.
monolog.yaml
1 2 3 4 5 6 |
monolog: handlers: ........ kafka: type: service id: KafkaHandler |
Création du service
Nous avons demander a monolog d’envoyer les logs vers le service KafkaHandler. Notre service doit étendre use Monolog\Handler\AbstractProcessingHandler.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
<?php namespace App\Logger; use Monolog\Logger; use Monolog\Handler\AbstractProcessingHandler; class KafkaHandler extends AbstractProcessingHandler { protected $producer; public function __construct($roducer, $level = Logger::DEBUG, bool $bubble = true) { $this->producer = $roducer; parent::__construct($level, $bubble); } protected function write(array $record): void { $this->producer->produce(json_encode($record)); } } |
Déclarons le service dans services.yaml. Le second paramètre permet d’indiquer que nous ne concevrons que les logs du niveau ‘error’ ou plus.
1 2 3 |
KafkaHandler: class: App\Logger\KafkaHandler arguments: ['@kafka.producer.logs', 'error'] |
Ecouter les messages du topic mon_site-log et les envoyé dans ELK (ElasticSearch)
Nous allons utiliser le bunlde widicorp/kafka-bundle pour consommer les messages d’un topic et les envoyer vers ELK.
Pour cela j’ai créer un projet KafkaConsumer. Voyons comment il fonctionne.
fichier kafka.yaml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
widicorp_kafka: services_name_prefix: 'kafka' event_dispatcher: false consumers: log_to_elk: # nom du consummer configuration: metadata.broker.list: '%env(kafka_broker)%' # broker disponnible group.id: 'kafka_to_elasticsearch' # identifiant du groupe de consommateur. Permet a kafka de savoir qu'un groupe a consommer un message ou non (deux groupes différents consommerons les mêmes messages) enable.auto.commit: 1 # on va commité le faire d'avoir lu le message sinon il sera relu à chauqe lancement du consommateur topicConfiguration: auto.offset.reset: 'smallest' timeout_consuming_queue: 200 message_handler: 'App\KafkaConsumer\ToElasticsearch' # le service qui va traiter les messages topics: # liste des topic que l'on veut consommer - mon_site-log logger: enabled: false |
Et le service qui va traiter les messages ‘src/KafkaConsumer/ToElasticsearch.php’ :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
<?php namespace App\KafkaConsumer; use Widicorp\KafkaBundle\Handler\MessageHandlerAbstract; use RdKafka\Message; class ToElasticsearch extends MessageHandlerAbstract { /** * @var url */ private $elkUrl; public function setElasticsearchUrl(string $elkUrl) { $this->elkUrl = $elkUrl; } public function process(Message $message) { /** * Forme de l'objet message : * object(RdKafka\Message)#386 (7) { * ["err"]=> // * int(0) * ["topic_name"]=> // Returns the source topic of the message. Necessary if you listen more than one topic * string(8) "catwoman" * ["partition"]=> // Returns the partition number where the message is stored. * int(0) * ["payload"]=> // Returns the payload of the message, or None if there is no payload. * string(497) "{"message":"User was reloaded from a user provider.","context":{"provider":"FOS\\UserBundle\\Security\\EmailUserProvider","username":"Jeremy_R"},"level":100,"level_name":"DEBUG","channel":"security","datetime":{"date":"2019-04-18 17:57:42.804132","timezone_type":3,"timezone":"Europe\/Paris"},"extra":[],"formatted":"[2019-04-18 17:57:42] security.DEBUG: User was reloaded from a user provider. {\"provider\":\"FOS\\\\UserBundle\\\\Security\\\\EmailUserProvider\",\"username\":\"Jeremy_R\"} []\n"}" * ["len"]=> // lenght of the message * int(497) * ["key"]=> // Returns the key of the message, or None if there is no key. * NULL * ["offset"]=> // Returns the offset of the message. * int(879) * } */ if ($message->topic_name == 'cdr_web-log') { $curl = new \Curl\Curl(); $curl->post($this->elkUrl . '/mon_site/log/', $message->payload); } } } |
Ici, si le message est sur le topic ‘mon_site-log’ on le poste dans l’index ‘mon_site’ avec le type ‘log’ dans ElasticSearch.
Pour lancer la consommation de des d’un topic il faut lancer le sript de Symfony widicorp:kafka:consume avec comme argument le nom de notre consommateur :
1 |
php bin/console widicorp:kafka:consume log_to_elk |