Nous allons dans cette partie détailler le script par blocs. Celui-ci n’est donc pas fonctionnel en l’état, car l’objectif recherché est de présenter la trame et les différentes parties.
Vous trouverez dans le bloc ‘info’ le script complet et fonctionnel (il faut juste mettre un bearer_token).
Vous pouvez télécharger le script entier ici : twitter.py
Dans cette première trame, vous trouverez des commentaires qui redirigent vers la section correspondant à chaque partie pour des détailles.
# Bibliotheque à utiliser
import tweepy
import time
import datetime
from tweepy import StreamingClient, StreamRule
import os
import signal
from elasticsearch import Elasticsearch
# Connection à la base elasticsearch
es = Elasticsearch("http://localhost:9200")
# Token twitter à utiliser
bearer_token = "***"
client = tweepy.Client(bearer_token=bearer_token)
# Mots clé suivis
mots_a_suivre = ['#cyberattaque', '#cyberdéfense', '#FIC2023', '@FIC_eu','@FICAmNord']
# Classe qui nous servira lorsqu'on recevra des tweets : Plus d'informations section "Class TweetPrinterV2"
class TweetPrinterV2(tweepy.StreamingClient):
def on_connect(self):
print("connected")
def on_tweet(self, tweet):
print("Tweet reçu")
# [...]
# Initialisation de la classe TweetPrinterV2
printer = TweetPrinterV2(bearer_token)
# Fonction pour supprimer les règles : Plus d'informations section "Fonction del_rules"
def del_rules(printer):
#[...]
pass
del_rules(printer)
# Appel de la methode filter, informations dans la section "Class TweetPrinterV2"
printer.filter(tweet_fields=["referenced_tweets","created_at","geo", "public_metrics"], expansions=["author_id"], user_fields=["username"])
L’api possède un comportement un peu étrange, du moins qui peut surprendre au début. Les régles (mots qu’on veut suivre), ne sont pas enregistés en local, mais sur l’api. Cela signifie que si j’ajoute les règle “A” et “B”, si je relance le programme sans rêgle, les regles “A” et “B”, seront toujours en cours. Il faut donc, dans notre cas, vider toute les règles existantes et ajouter les nouvelles. C’est ce que fais la fonction del_rules. Nous en avons eu besoin pendant le développement, car parfois nous ne voulons pas supprimer les anciennes règles (la suppression fait 15-20 requètes et nous avons le droit à 50 reqs / 15 min).
La fonction récupère les règles existantes, puis supprime chaque règles. Enfin elle ajoute chaque mot de la liste mots_a_suivre dans les règles.
def del_rules(printer):
# clean-up pre-existing rules
rule_ids = []
result = printer.get_rules()
print("old_rules : ", result.data)
if result.data:
for rule in result.data:
print(f"rule marked to delete: {rule.id} - {rule.value}")
rule_ids.append(rule.id)
if(len(rule_ids) > 0):
printer.delete_rules(rule_ids)
printer = TweetPrinterV2(bearer_token)
else:
print("no rules to delete")
for mots in mots_a_suivre:
rule = StreamRule(value=mots)
printer.add_rules(rule)
La classe TweetPrinterV2 est là pour réimplementer certaines methodes de la classe tweepy.StreamingClient. Notamment on_connect et on-tweet.
La méthode on-connect nous sert a savoir lorsque nous sommes connectés à l’api tweeter. La méthode on-tweet correspond au traitement qu’on fait lorsqu’on récupère un tweet. Nous allons décomposer on-tweet en deux parties, d’un coté ce qui ne parle pas d’Elasticsearch et de l’autre comment on ajoute le tweet dans Elasticsearch.
class TweetPrinterV2(tweepy.StreamingClient):
def on_connect(self):
print("connected")
# Action Lorsqu'un tweet est reçu
def on_tweet(self, tweet):
index_type = "tweet"
if tweet.referenced_tweets:
index_type = tweet.referenced_tweets[0]["type"]
print("type : ", index_type)
author_id = tweet.author_id
user = client.get_users(ids=author_id, user_fields=["username"])
username = user.data[0].username
words = []
for mot in mots_a_suivre:
if (mot.upper() in tweet.text.upper()):
words.append(mot)
# Affichage des informations dans le terminal
print("tweet author id : ", tweet.author_id)
print("tweet author username : ", username)
print("Date : ", tweet.created_at)
print("Text : \n", tweet.text)
metric = tweet.public_metrics
print("nb like : ", metric["like_count"])
print("nb retweets :", metric["retweet_count"])
print("localisation : ", tweet.geo)
print("mots : ", words)
print("-"*50)
# Stockage des information dans la base elasticsearch
es.index(index='bot_twitter', doc_type="tweet", body={
'id' : tweet.id,
'tweet_type' : index_type,
'author_id': author_id,
'author_username': username,
'text': tweet.text,
'date': tweet.created_at,
'location': tweet.geo,
'likes': metric["like_count"],
'retweets': metric["retweet_count"],
'words' : words
})
time.sleep(3)
index_type = "tweet"
if tweet.referenced_tweets:
index_type = tweet.referenced_tweets[0]["type"]
print("type : ", index_type)
author_id = tweet.author_id
user = client.get_users(ids=author_id, user_fields=["username"])
username = user.data[0].username
words = []
for mot in mots_a_suivre:
if (mot.upper() in tweet.text.upper()):
words.append(mot)
# Affichage des informations dans le terminal
print("tweet author id : ", tweet.author_id)
print("tweet author username : ", username)
print("Date : ", tweet.created_at)
print("Text : \n", tweet.text)
metric = tweet.public_metrics
print("nb like : ", metric["like_count"])
print("nb retweets :", metric["retweet_count"])
print("localisation : ", tweet.geo)
print("mots : ", words)
print("-"*50)
Lorsqu’un tweet est reçu, nous commençons par voir si c’est un tweet simple, un retweet, un Quoted Tweet ou une réponse (if tweet.referenced_tweets: ...).
Nous récupérons l’id de l’auteur (author_id = tweet.author_id) puis nous recupérons l’username :
user = client.get_users(ids=author_id, user_fields=["username"])
username = user.data[0].username
Nous devons maintenant savoir quels sont les mots qui ont permis de faire ‘matcher’ ce tweet, en effet pour le moment nous recomparons tout ce qui est dans la liste mots_a_suivre (vu précédement), mais une fois le tweet reçu il n’est pas dit quel mot à déclanché la récupération de celui-ci. Nous allons donc pour chaque tweet, créer une liste qui contiendra les mots qui ‘matchent’ le tweet.
words = []
for mot in mots_a_suivre:
if (mot.upper() in tweet.text.upper()):
words.append(mot)
Maintenant que nous avons toutes les informations, nous les affichons dans le terminal, cela nous permet de voir en temps réel ce qu’il se passe, et si le bot trouve bien des tweets. Il ne nous reste plus qu’à stocker toutes ces informations dans Elasticsearch.
es.index(index='bot_twitter',
doc_type="tweet",
body={
'id' : tweet.id, # Numéro du tweet
'tweet_type' : index_type, # Type de tweet (vu plus haut, tweet, retweet, reply, ...)
'author_id': author_id, # Identifiant de l'auteur
'author_username': username, # Username de l'auteur
'text': tweet.text, # Texte du tweet récupéré par le bot
'date': tweet.created_at, # Date de création
'location': tweet.geo, # Localisation (souvent vide)
'likes': metric["like_count"], # Nombre de like
'retweets': metric["retweet_count"], # Nombre de retweets
'words' : words # Mots suivies qui correspondent au tweets.
}
)
Elasticsearch se trouve être plus simple à utiliser qu’initialement prévu. Après s’être connecté dessus au tout début es = Elasticsearch("http://localhost:9200"), il ne nous reste plus qu’à ajouter des éléments dedans grâce à la fonction es.index().
printer.filter(tweet_fields=["referenced_tweets","created_at","geo", "public_metrics"], expansions=["author_id"], user_fields=["username"])
La méthode filter, permet de spécifier avant de recevoir les tweets, quels sont les champs que l’on veut avoir. Ils sont par la suite utilisés dans la méthode on_tweet.
Ci-dessous, ce que l’on obtient lorsqu’on lance le script, vous pouvez voir au debut les anciennes régles être supprimées, avant que la connexion soit faite.
Vous pouvez télécharger les logs entiers ici : log.txt
(.venv) botme@debian:~/Documents/bot-twitter$ python twitter_bis.py
old_rules : [StreamRule(value='#cyberattaque', tag=None, id='1605263767297728529'), StreamRule(value='#cyberdéfense', tag=None, id='1605263772599328775'), StreamRule(value='#FIC2023', tag=None, id='1605263777821237268'), StreamRule(value='@afpfr', tag=None, id='1605263782263005198'), StreamRule(value='@FIC_eu', tag=None, id='1605263786528710656'), StreamRule(value='#mbappe', tag=None, id='1605263791196889096'), StreamRule(value='@FICAmNord', tag=None, id='1605263795806507008'), StreamRule(value='python', tag=None, id='1605263800885714979')]
rule marked to delete: 1605263767297728529 - #cyberattaque
rule marked to delete: 1605263772599328775 - #cyberdéfense
rule marked to delete: 1605263777821237268 - #FIC2023
rule marked to delete: 1605263782263005198 - @afpfr
rule marked to delete: 1605263786528710656 - @FIC_eu
rule marked to delete: 1605263791196889096 - #mbappe
rule marked to delete: 1605263795806507008 - @FICAmNord
rule marked to delete: 1605263800885714979 - python
connected
type : tweet
tweet author id : 1427660000441245701
tweet author username : IWASInstitute
Date : 2022-12-28 18:54:09+00:00
Text :
Cyber Yankee
#usmc #Marine
#marines #marinecorps #cyber #CyberAttack #cyberattacks #cyberattaque https://t.co/0LenryovC9
nb like : 0
nb retweets : 0
localisation : {}
mots : ['#cyberattaque']
--------------------------------------------------
/home/botme/Documents/bot-twitter/twitter_bis.py:54: DeprecationWarning: The 'body' parameter is deprecated for the 'index' API and will be removed in a future version. Instead use the 'document' parameter. See https://github.com/elastic/elasticsearch-py/issues/1698 for more information
es.index(index='bot_twitter', doc_type="tweet", body={
/home/botme/Documents/bot-twitter/.venv/lib/python3.9/site-packages/elasticsearch/connection/base.py:200: ElasticsearchWarning: [types removal] Specifying types in document index requests is deprecated, use the typeless endpoints instead (/{index}/_doc/{id}, /{index}/_doc, or /{index}/_create/{id}).
warnings.warn(message, category=ElasticsearchWarning)
type : tweet
tweet author id : 243165224
tweet author username : AndyMerkl
Date : 2022-12-28 21:50:26+00:00
Text :
[#BestOf2022 🛡️] In June we were present at #FIC2022!
Rendez-vous at @FIC_eu | 🗓️ 5, 6 & 7 April 2023 | #FIC2023 in Lille 😀
In Cloud should we trust?
Yes we can! #CloudSecurity 👉 https://t.co/BB4lmtFyBx
See you next year 🤗 https://t.co/gDHIjz4L2P
nb like : 0
nb retweets : 0
localisation : {}
mots : ['#FIC2023', '@FIC_eu']
--------------------------------------------------
type : tweet
tweet author id : 827356011459649540
tweet author username : diallo_teams
Date : 2022-12-28 22:54:46+00:00
Text :
🇺🇦 Le ministre #ukrainien de la transformation numérique Fedorov a déclaré à #Bloomberg qu'une #cyberattaque sur l'hébergement vidéo 📲RuTube le 9 mai a été menée par des #pirates pro-ukrainiens« L'armée informatique a "complètement piraté" RuTube, une plate-forme vidéo russe https://t.co/AtWekz7ugS
nb like : 0
nb retweets : 0
localisation : {}
mots : ['#cyberattaque']
--------------------------------------------------
[...]