This commit is contained in:
Tykayn 2025-07-28 12:15:43 +02:00 committed by tykayn
parent cdd4d6e549
commit a080ab844f
15 changed files with 392 additions and 80 deletions

83
2_move_france_folder.py Normal file
View file

@ -0,0 +1,83 @@
import os
import random
import piexif
from PIL import Image
#username="konink360"
username="raymond"
counter=0
# Définition de la bounding box de la France métropolitaine
FRANCE_METROPOLITAINE = {
'min_lat': 42.25,
'max_lat': 51.1,
'min_lon': -4.8,
'max_lon': 8.2
}
def get_random_jpg_file(dossier):
"""Renvoie le chemin d'un fichier JPG au hasard dans le dossier"""
jpg_files = [f for f in os.listdir(dossier) if f.endswith('.jpg')]
if not jpg_files:
return None
return os.path.join(dossier, random.choice(jpg_files))
def get_exif_info(file_path):
"""Renvoie les informations EXIF de la photo"""
try:
exif_dict = piexif.load(file_path)
return exif_dict
except Exception as e:
print(f"Erreur lors de la lecture des EXIF : {e}")
return None
def is_in_france(exif_info):
"""Vérifie si la photo a des informations de géolocalisation situées en France métropolitaine"""
if not exif_info or 'GPS' not in exif_info:
return False
gps_info = exif_info['GPS']
lat_ref = gps_info[1]
lat = gps_info[2]
lon_ref = gps_info[3]
lon = gps_info[4]
if lat_ref == b'N':
signe_lat = 1
else:
signe_lat = -1
if lon_ref == b'E':
signe_lon = 1
else:
signe_lon = -1
# Convertir les tuples en nombres
lat = signe_lat * (lat[0][0] / lat[0][1] + lat[1][0] / (lat[1][1] * 60) + lat[2][0] / (lat[2][1] * 3600))
lon = signe_lon * (lon[0][0] / lon[0][1] + lon[1][0] / (lon[1][1] * 60) + lon[2][0] / (lon[2][1] * 3600))
return (FRANCE_METROPOLITAINE['min_lat'] <= lat <= FRANCE_METROPOLITAINE['max_lat'] and
FRANCE_METROPOLITAINE['min_lon'] <= lon <= FRANCE_METROPOLITAINE['max_lon'])
def move_to_found_in_france(dossier):
"""Déplace le dossier vers found_in_france"""
dest_folder = 'found_in_france'
if not os.path.exists(dest_folder):
os.makedirs(dest_folder)
os.rename(dossier, os.path.join(dest_folder, os.path.basename(dossier)))
print('dossier déplacé:',counter, dossier, dest_folder)
def main():
root_folder = 'data/'+username
for dossier in os.listdir(root_folder):
dossier_path = os.path.join(root_folder, dossier)
# print(dossier_path)
if os.path.isdir(dossier_path):
jpg_file = get_random_jpg_file(dossier_path)
# print(jpg_file)
if jpg_file:
exif_info = get_exif_info(jpg_file)
# print(exif_info)
if exif_info and is_in_france(exif_info):
move_to_found_in_france(dossier_path)
# counter += 1
print(f"Dossier {dossier} déplacé vers found_in_france")
if __name__ == '__main__':
main()

1
R_A_one.sh Normal file
View file

@ -0,0 +1 @@
/usr/bin/python3 mapillary_download.py --access_token='MLY|6793676763984675|2bd7b145b02ea002a52b8968aaa62be2' --sequence_id='JTkO0zJAkyln3qWn9f7NVQ' --username=R_A

View file

@ -5,6 +5,15 @@
# Liste des usernames
# example:
# usernames=( "riri" "fifi" "loulou")
#usernames=( "irgaresh1" "albanmorlot" "zaneo" "bernardvoyageur" "pelderson" "emerzh" "donlouba")
#usernames=("tedscougv" "gaptpro" "zlplhr" "derfred" "reisender" "jonnymccullagh" "jasskurn" "yopaseopor") # keep it empty to be asked each time you run the script which account to export
#usernames=("raymond")
#usernames=("thierry1030" "konink360")
#usernames=("didier2020") # et aussi deleted_user
#usernames=("alainproviste")
#usernames=("ratzillas") # et tdelmas
#usernames=("niquarl")
#usernames=("cyrillelargillier")
usernames=()
if test -z $usernames; then

67
csv_to_geojson.ts Normal file
View file

@ -0,0 +1,67 @@
import fs from 'fs';
import path from 'path';
import csvParser from 'csv-parser';
import minimist from 'minimist';
import { Feature, FeatureCollection, Point } from 'geojson';
interface Options {
dir: string;
file: string;
latColumn: string;
lonColumn: string;
hasHeaders: boolean;
}
/**
* @name csvToGeoJSON
* @description conversion de csv vers geojson
*
* Utilisation:
* node csv_to_geojson.ts -d ./etalab_data/panneaux -f mon_fichier.csv -lat GPSLatitude -lon GPSLongitude -h
*
**/
function csvToGeoJSON(options: Options): FeatureCollection<Point> {
const { dir, file, latColumn, lonColumn, hasHeaders } = options;
const filePath = path.join(dir, file);
const features: Feature<Point>[] = [];
fs.createReadStream(filePath)
.pipe(csvParser({ headers: hasHeaders }))
.on('data', (row) => {
const lat = parseFloat(row[latColumn]);
const lon = parseFloat(row[lonColumn]);
features.push({
type: 'Feature',
geometry: {
type: 'Point',
coordinates: [lon, lat],
},
properties: row,
});
})
.on('end', () => {
const geoJSON: FeatureCollection<Point> = {
type: 'FeatureCollection',
features,
};
fs.writeFileSync(`${file}.geojson`, JSON.stringify(geoJSON, null, 2));
console.log(`GeoJSON créé avec succès : ${file}.geojson`);
});
return features;
}
const args = minimist<Options>(process.argv.slice(2), {
alias: {
dir: 'd',
file: 'f',
latColumn: 'lat',
lonColumn: 'lon',
hasHeaders: 'h',
},
default: {
hasHeaders: true,
},
});
csvToGeoJSON(args);

0
curl_land.sh Normal file → Executable file
View file

View file

@ -214,12 +214,12 @@ if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument(
"--source_dir",
default="/home/cipherbliss/Téléchargements/FIBRELAND/TEST_IN_FR/",
default="/home/poule/encrypted/stockage-syncable/www/development/html/mapillary_download/data/",
help="Chemin du répertoire source",
)
parser.add_argument(
"--destination_dir",
default="/home/cipherbliss/Téléchargements/FIBRELAND/IN_FRANCE/",
default="/home/poule/encrypted/stockage-syncable/www/development/html/mapillary_download/data_IN_FRANCE/",
help="Chemin du répertoire destination",
)
parser.add_argument(

2
find_user_id.sh Normal file → Executable file
View file

@ -10,6 +10,6 @@ ID=$(echo "$response" | jq -r '.data.user_by_username.id')
echo "ID: $ID"
curl "https://graph.mapillary.com/graphql?doc=query%20getLatestActivity(%24id%3A%20ID!%2C%20%24first%3A%20Int%2C%20%24after%3A%20ID%2C%20%24hide_after%3A%20Int)%20%7B%0A%20%20%20%20%20%20fetch__User(id%3A%20%24id)%20%7B%0A%20%20%20%20%20%20%20%20id%0A%20%20%20%20%20%20%20%20feed(first%3A%20%24first%2C%20after%3A%20%24after%2C%20hide_failed_sequences_after_days%3A%20%24hide_after)%20%7B%0A%20%20%20%20%20%20%20%20%20%20page_info%20%7B%0A%20%20%20%20%20%20%20%20%20%20%20%20start_cursor%0A%20%20%20%20%20%20%20%20%20%20%20%20end_cursor%0A%20%20%20%20%20%20%20%20%20%20%20%20has_next_page%0A%20%20%20%20%20%20%20%20%20%20%20%20has_previous_page%0A%20%20%20%20%20%20%20%20%20%20%7D%0A%20%20%20%20%20%20%20%20%20%20nodes%20%7B%0A%20%20%20%20%20%20%20%20%20%20%20%20cluster_id%20type%20created_at_seconds%20captured_at_seconds%20thumb_url%20item_count%20image_id%20status%20initial_processing_status%20anonymization_status%20tiler_status%20error_code%20timezone%0A%20%20%20%20%20%20%20%20%20%20%20%20__typename%0A%20%20%20%20%20%20%20%20%20%20%7D%0A%20%20%20%20%20%20%20%20%20%20__typename%0A%20%20%20%20%20%20%20%20%7D%0A%20%20%20%20%20%20%20%20__typename%0A%20%20%20%20%20%20%7D%0A%20%20%20%20%20%20__typename%0A%20%20%20%20%7D&query=query%20getLatestActivity(%24id%3A%20ID!%2C%20%24first%3A%20Int%2C%20%24after%3A%20ID%2C%20%24hide_after%3A%20Int)%20%7B%0A%20%20fetch__User(id%3A%20%24id)%20%7B%0A%20%20%20%20id%0A%20%20%20%20feed(%0A%20%20%20%20%20%20first%3A%20%24first%0A%20%20%20%20%20%20after%3A%20%24after%0A%20%20%20%20%20%20hide_failed_sequences_after_days%3A%20%24hide_after%0A%20%20%20%20)%20%7B%0A%20%20%20%20%20%20page_info%20%7B%0A%20%20%20%20%20%20%20%20start_cursor%0A%20%20%20%20%20%20%20%20end_cursor%0A%20%20%20%20%20%20%20%20has_next_page%0A%20%20%20%20%20%20%20%20has_previous_page%0A%20%20%20%20%20%20%20%20__typename%0A%20%20%20%20%20%20%7D%0A%20%20%20%20%20%20nodes%20%7B%0A%20%20%20%20%20%20%20%20cluster_id%0A%20%20%20%20%20%20%20%20type%0A%20%20%20%20%20%20%20%20created_at_seconds%0A%20%20%20%20%20%20%20%20captured_at_seconds%0A%20%20%20%20%20%20%20%20thumb_url%0A%20%20%20%20%20%20%20%20item_count%0A%20%20%20%20%20%20%20%20image_id%0A%20%20%20%20%20%20%20%20status%0A%20%20%20%20%20%20%20%20initial_processing_status%0A%20%20%20%20%20%20%20%20anonymization_status%0A%20%20%20%20%20%20%20%20tiler_status%0A%20%20%20%20%20%20%20%20error_code%0A%20%20%20%20%20%20%20%20timezone%0A%20%20%20%20%20%20%20%20__typename%0A%20%20%20%20%20%20%7D%0A%20%20%20%20%20%20__typename%0A%20%20%20%20%7D%0A%20%20%20%20__typename%0A%20%20%7D%0A%20%20__typename%0A%7D&operationName=getLatestActivity&variables=%7B%22id%22%3A%22${ID}%22%2C%22first%22%3A10000%2C%22after%22%3Anull%2C%22hide_after%22%3A14%7D" --compressed -H 'User-Agent: Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:127.0) Gecko/20100101 Firefox/127.0' -H 'Accept: */*' -H 'Accept-Language: fr,en-US;q=0.7,en;q=0.3' -H 'Accept-Encoding: gzip, deflate, br, zstd' -H 'Referer: https://www.mapillary.com/' -H 'content-type: application/json' -H 'authorization: OAuth MLYARA3tSkHGXL0kEKYPx49q2BjzoZCfpZAl9HO7R8YdUKT99yMZB2pJxPzkSd3khd6C1ZBPgrUYZCE3wlsCG3ZC4UNn4RuJZChbIzmRfiE10ZA7eX06KGEhUb9yIA8HZBzyKg2PRlflG3h0pwZDZD' -H 'Origin: https://www.mapillary.com' -H 'Connection: keep-alive' -H 'Sec-Fetch-Dest: empty' -H 'Sec-Fetch-Mode: cors' -H 'Sec-Fetch-Site: same-site' -H 'Sec-GPC: 1' -H 'Priority: u=4' -H 'TE: trailers' -sS > "out_${1}.json"
curl "https://graph.mapillary.com/graphql?doc=query%20getLatestActivity(%24id%3A%20ID!%2C%20%24first%3A%20Int%2C%20%24after%3A%20ID%2C%20%24hide_after%3A%20Int)%20%7B%0A%20%20%20%20%20%20fetch__User(id%3A%20%24id)%20%7B%0A%20%20%20%20%20%20%20%20id%0A%20%20%20%20%20%20%20%20feed(first%3A%20%24first%2C%20after%3A%20%24after%2C%20hide_failed_sequences_after_days%3A%20%24hide_after)%20%7B%0A%20%20%20%20%20%20%20%20%20%20page_info%20%7B%0A%20%20%20%20%20%20%20%20%20%20%20%20start_cursor%0A%20%20%20%20%20%20%20%20%20%20%20%20end_cursor%0A%20%20%20%20%20%20%20%20%20%20%20%20has_next_page%0A%20%20%20%20%20%20%20%20%20%20%20%20has_previous_page%0A%20%20%20%20%20%20%20%20%20%20%7D%0A%20%20%20%20%20%20%20%20%20%20nodes%20%7B%0A%20%20%20%20%20%20%20%20%20%20%20%20cluster_id%20type%20created_at_seconds%20captured_at_seconds%20thumb_url%20item_count%20image_id%20status%20initial_processing_status%20anonymization_status%20tiler_status%20error_code%20timezone%0A%20%20%20%20%20%20%20%20%20%20%20%20__typename%0A%20%20%20%20%20%20%20%20%20%20%7D%0A%20%20%20%20%20%20%20%20%20%20__typename%0A%20%20%20%20%20%20%20%20%7D%0A%20%20%20%20%20%20%20%20__typename%0A%20%20%20%20%20%20%7D%0A%20%20%20%20%20%20__typename%0A%20%20%20%20%7D&query=query%20getLatestActivity(%24id%3A%20ID!%2C%20%24first%3A%20Int%2C%20%24after%3A%20ID%2C%20%24hide_after%3A%20Int)%20%7B%0A%20%20fetch__User(id%3A%20%24id)%20%7B%0A%20%20%20%20id%0A%20%20%20%20feed(%0A%20%20%20%20%20%20first%3A%20%24first%0A%20%20%20%20%20%20after%3A%20%24after%0A%20%20%20%20%20%20hide_failed_sequences_after_days%3A%20%24hide_after%0A%20%20%20%20)%20%7B%0A%20%20%20%20%20%20page_info%20%7B%0A%20%20%20%20%20%20%20%20start_cursor%0A%20%20%20%20%20%20%20%20end_cursor%0A%20%20%20%20%20%20%20%20has_next_page%0A%20%20%20%20%20%20%20%20has_previous_page%0A%20%20%20%20%20%20%20%20__typename%0A%20%20%20%20%20%20%7D%0A%20%20%20%20%20%20nodes%20%7B%0A%20%20%20%20%20%20%20%20cluster_id%0A%20%20%20%20%20%20%20%20type%0A%20%20%20%20%20%20%20%20created_at_seconds%0A%20%20%20%20%20%20%20%20captured_at_seconds%0A%20%20%20%20%20%20%20%20thumb_url%0A%20%20%20%20%20%20%20%20item_count%0A%20%20%20%20%20%20%20%20image_id%0A%20%20%20%20%20%20%20%20status%0A%20%20%20%20%20%20%20%20initial_processing_status%0A%20%20%20%20%20%20%20%20anonymization_status%0A%20%20%20%20%20%20%20%20tiler_status%0A%20%20%20%20%20%20%20%20error_code%0A%20%20%20%20%20%20%20%20timezone%0A%20%20%20%20%20%20%20%20__typename%0A%20%20%20%20%20%20%7D%0A%20%20%20%20%20%20__typename%0A%20%20%20%20%7D%0A%20%20%20%20__typename%0A%20%20%7D%0A%20%20__typename%0A%7D&operationName=getLatestActivity&variables=%7B%22id%22%3A%22${ID}%22%2C%22first%22%3A10000%2C%22after%22%3Anull%2C%22hide_after%22%3A14%7D&bbox=42.25,-4.77,51.10,9.57" --compressed -H 'User-Agent: Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:127.0) Gecko/20100101 Firefox/127.0' -H 'Accept: */*' -H 'Accept-Language: fr,en-US;q=0.7,en;q=0.3' -H 'Accept-Encoding: gzip, deflate, br, zstd' -H 'Referer: https://www.mapillary.com/' -H 'content-type: application/json' -H 'authorization: OAuth MLYARA3tSkHGXL0kEKYPx49q2BjzoZCfpZAl9HO7R8YdUKT99yMZB2pJxPzkSd3khd6C1ZBPgrUYZCE3wlsCG3ZC4UNn4RuJZChbIzmRfiE10ZA7eX06KGEhUb9yIA8HZBzyKg2PRlflG3h0pwZDZD' -H 'Origin: https://www.mapillary.com' -H 'Connection: keep-alive' -H 'Sec-Fetch-Dest: empty' -H 'Sec-Fetch-Mode: cors' -H 'Sec-Fetch-Site: same-site' -H 'Sec-GPC: 1' -H 'Priority: u=4' -H 'TE: trailers' -sS > "out_${1}.json"
echo " lancez: python3 get_sequences_of_username.py --username=\"$1\" --dev_token='$MAPILLARY_DEV_TOKEN' --max_sequence=99999; bash text_array_to_download_script.py --username=$1 --dev_token='$MAPILLARY_DEV_TOKEN'"

View file

@ -34,14 +34,29 @@ def get_image_data_from_sequences():
username = args.username
input_file = "out_" + username + ".json"
print('get_image_data_from_sequences input_file', input_file)
# Chargement du fichier JSON d'entrée
with open(input_file, "r") as file:
input_data = json.load(file)
# Itération sur les noeuds pour collectionner les image_ids
# Vérification de l'existence des clés dans le chemin d'accès
if "data" not in input_data:
print("Erreur: 'data' n'existe pas dans le fichier JSON")
return
if "fetch__User" not in input_data["data"]:
print("Erreur: 'fetch__User' n'existe pas dans les données")
return
if "feed" not in input_data["data"]["fetch__User"]:
print("Erreur: 'feed' n'existe pas pour cet utilisateur")
return
if "nodes" not in input_data["data"]["fetch__User"]["feed"]:
print("Erreur: 'nodes' n'existe pas dans le feed")
return
nodelist = input_data["data"]["fetch__User"]["feed"]["nodes"]
print("séquences : ", len(nodelist))
image_ids = [node["image_id"] for node in nodelist]
image_ids = [node["image_id"] for node in nodelist if "image_id" in node]
print(image_ids)
dev_token = args.dev_token
@ -51,7 +66,14 @@ def get_image_data_from_sequences():
ii = 0
limit_requests = 1000000000
# limit_requests = 5 # pour tester
FRANCE_MIN_LAT = 42.25
FRANCE_MIN_LON = -4.77
FRANCE_MAX_LAT = 51.10
FRANCE_MAX_LON = 9.57
bbox_filter = f"&bbox={FRANCE_MIN_LAT},{FRANCE_MIN_LON},{FRANCE_MAX_LAT},{FRANCE_MAX_LON}"
# Boucle sur chaque image_id pour interroger l'API Mapillary
for image_id in image_ids:
ii += 1
@ -63,8 +85,9 @@ def get_image_data_from_sequences():
+ "?access_token="
+ dev_token
+ "&fields=id,sequence"
+ bbox_filter
)
# print("requete: "+request_url)
print("requete graphql : "+request_url)
response = requests.get(request_url)
@ -80,8 +103,9 @@ def get_image_data_from_sequences():
print(
"séquence trouvée: "
+ str(ii)
+ "/"
+ args.max_sequence
# TODO fix calculation of max sequence
# + "/"
# + args.max_sequence
+ " : "
+ raw_response["sequence"]
)

View file

@ -29,21 +29,36 @@ def parse_args(argv=None):
default=None,
help="Limit to a bounding box, e.g. '-5.5,47.3,-1.2,48.9', use http://bboxfinder.com",
)
parser.add_argument(
"--fr",
type=bool,
default=True,
help="Limit to the Fr metropolitan bouding box",
)
global args
args = parser.parse_args(argv)
FRANCE_MIN_LAT = 42.25
FRANCE_MIN_LON = -4.77
FRANCE_MAX_LAT = 51.10
FRANCE_MAX_LON = 9.57
if __name__ == "__main__":
print('images_par_username')
parse_args()
mly_key = args.access_token
creator_username = args.username
max_img = args.pictures
if args.fr:
bbox_filter = f"&bbox={FRANCE_MIN_LAT},{FRANCE_MIN_LON},{FRANCE_MAX_LAT},{FRANCE_MAX_LON}"
bbox_filter = f'&bbox={args.bbox}' if args.bbox is not None else ''
url = f"https://graph.mapillary.com/images?access_token={mly_key}&creator_username={creator_username}&limit={max_img}&fields=id,sequence{bbox_filter}"
print(url)
print("prendre les séquences de l'utilisateur :",url)
response = requests.get(url)

View file

@ -22,7 +22,10 @@ session.mount("https://", HTTPAdapter(max_retries=retries_strategies))
def parse_args(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument("--access_token", type=str, help="Your mapillary access token")
parser.add_argument("--access_token",
type=str,
required=True,
help="Your mapillary access token")
parser.add_argument(
"--sequence_ids",
type=str,
@ -124,6 +127,7 @@ def get_image_data_from_sequences(sequences_id, mly_header):
url = "https://graph.mapillary.com/image_ids?sequence_id={}".format(sequence_id)
r = requests.get(url, headers=header)
data = r.json()
image_ids = data["data"]
total_image = len(image_ids)
print(

35
move_already_sent.py Normal file
View file

@ -0,0 +1,35 @@
import os
import shutil
# Définition des chemins
found_in_france_path = 'found_in_france'
already_sent_path = 'already_sent'
# Vérification de l'existence des dossiers
if not os.path.exists(found_in_france_path):
print(f"Le dossier {found_in_france_path} n'existe pas.")
exit(1)
if not os.path.exists(already_sent_path):
os.makedirs(already_sent_path)
# Parcours des sous-dossiers
for dir_name in os.listdir(found_in_france_path):
dir_path = os.path.join(found_in_france_path, dir_name)
# Vérifier si c'est un dossier
if os.path.isdir(dir_path):
# Rechercher le fichier panoramax.txt dans le dossier
file_path = os.path.join(dir_path, '_panoramax.txt')
if os.path.exists(file_path):
# Déplacer le dossier vers already_sent
new_dir_path = os.path.join(already_sent_path, dir_name)
try:
shutil.move(dir_path, new_dir_path)
print(f"Dossier {dir_name} déplacé vers {already_sent_path}")
except Exception as e:
print(f"Erreur lors du déplacement de {dir_name} : {e}")
else:
print(f"Aucun fichier panoramax.txt trouvé dans {dir_name}")
else:
print(f"{dir_name} n'est pas un dossier.")

83
move_france_folder.py Normal file
View file

@ -0,0 +1,83 @@
import os
import random
import piexif
from PIL import Image
#username="konink360"
username="konink360"
counter=0
# Définition de la bounding box de la France métropolitaine
FRANCE_METROPOLITAINE = {
'min_lat': 42.25,
'max_lat': 51.1,
'min_lon': -4.8,
'max_lon': 8.2
}
def get_random_jpg_file(dossier):
"""Renvoie le chemin d'un fichier JPG au hasard dans le dossier"""
jpg_files = [f for f in os.listdir(dossier) if f.endswith('.jpg')]
if not jpg_files:
return None
return os.path.join(dossier, random.choice(jpg_files))
def get_exif_info(file_path):
"""Renvoie les informations EXIF de la photo"""
try:
exif_dict = piexif.load(file_path)
return exif_dict
except Exception as e:
print(f"Erreur lors de la lecture des EXIF : {e}")
return None
def is_in_france(exif_info):
"""Vérifie si la photo a des informations de géolocalisation situées en France métropolitaine"""
if not exif_info or 'GPS' not in exif_info:
return False
gps_info = exif_info['GPS']
lat_ref = gps_info[1]
lat = gps_info[2]
lon_ref = gps_info[3]
lon = gps_info[4]
if lat_ref == b'N':
signe_lat = 1
else:
signe_lat = -1
if lon_ref == b'E':
signe_lon = 1
else:
signe_lon = -1
# Convertir les tuples en nombres
lat = signe_lat * (lat[0][0] / lat[0][1] + lat[1][0] / (lat[1][1] * 60) + lat[2][0] / (lat[2][1] * 3600))
lon = signe_lon * (lon[0][0] / lon[0][1] + lon[1][0] / (lon[1][1] * 60) + lon[2][0] / (lon[2][1] * 3600))
return (FRANCE_METROPOLITAINE['min_lat'] <= lat <= FRANCE_METROPOLITAINE['max_lat'] and
FRANCE_METROPOLITAINE['min_lon'] <= lon <= FRANCE_METROPOLITAINE['max_lon'])
def move_to_found_in_france(dossier):
"""Déplace le dossier vers found_in_france"""
dest_folder = 'found_in_france'
if not os.path.exists(dest_folder):
os.makedirs(dest_folder)
os.rename(dossier, os.path.join(dest_folder, os.path.basename(dossier)))
print('dossier déplacé:',counter, dossier, dest_folder)
def main():
root_folder = 'data/'+username
for dossier in os.listdir(root_folder):
dossier_path = os.path.join(root_folder, dossier)
# print(dossier_path)
if os.path.isdir(dossier_path):
jpg_file = get_random_jpg_file(dossier_path)
# print(jpg_file)
if jpg_file:
exif_info = get_exif_info(jpg_file)
# print(exif_info)
if exif_info and is_in_france(exif_info):
move_to_found_in_france(dossier_path)
# counter += 1
print(f"Dossier {dossier} déplacé vers found_in_france")
if __name__ == '__main__':
main()

0
secrets_variables_example.sh Normal file → Executable file
View file

29
test_error_handling.py Normal file
View file

@ -0,0 +1,29 @@
import json
import os
import sys
# Create a test JSON file with missing feed key
test_data = {
"data": {
"fetch__User": {
# "feed" key is missing
}
}
}
# Write test data to a temporary file
test_username = "test_user"
test_file = f"out_{test_username}.json"
with open(test_file, "w") as f:
json.dump(test_data, f)
print(f"Created test file: {test_file}")
print("Running get_sequences_of_username.py with test data...")
# Run the script with the test data
os.system(f"python3 get_sequences_of_username.py --username={test_username} --dev_token=test_token")
# Clean up
os.remove(test_file)
print(f"Removed test file: {test_file}")

View file

@ -31,8 +31,7 @@ class PictureMetadata:
direction: Optional[float] = None
orientation: Optional[int] = 1
class Writer:
class Writer():
def __init__(self, picture: bytes) -> None:
self.content = picture
self.image = pyexiv2.ImageData(picture)
@ -54,11 +53,7 @@ class Writer:
if self.updated_xmp:
self.image.modify_xmp(self.updated_xmp)
except Exception as e:
print(
"exception \nexif: {}\nxmp: {}".format(
self.updated_exif, self.updated_xmp
)
)
print("exception \nexif: {}\nxmp: {}".format(self.updated_exif, self.updated_xmp))
def close(self) -> None:
self.image.close()
@ -70,12 +65,7 @@ class Writer:
"""
Override exif metadata on raw picture and return updated bytes
"""
if (
not metadata.capture_time
and not metadata.longitude
and not metadata.latitude
and not metadata.picture_type
):
if not metadata.capture_time and not metadata.longitude and not metadata.latitude and not metadata.picture_type:
return
if metadata.capture_time:
@ -93,20 +83,12 @@ class Writer:
Add latitude and longitude values in GPSLatitude + GPSLAtitudeRef and GPSLongitude + GPSLongitudeRef
"""
if metadata.latitude is not None:
self.updated_exif["Exif.GPSInfo.GPSLatitudeRef"] = (
"N" if metadata.latitude > 0 else "S"
)
self.updated_exif["Exif.GPSInfo.GPSLatitude"] = self._to_exif_dms(
metadata.latitude
)
self.updated_exif["Exif.GPSInfo.GPSLatitudeRef"] = "N" if metadata.latitude > 0 else "S"
self.updated_exif["Exif.GPSInfo.GPSLatitude"] = self._to_exif_dms(metadata.latitude)
if metadata.longitude is not None:
self.updated_exif["Exif.GPSInfo.GPSLongitudeRef"] = (
"E" if metadata.longitude > 0 else "W"
)
self.updated_exif["Exif.GPSInfo.GPSLongitude"] = self._to_exif_dms(
metadata.longitude
)
self.updated_exif["Exif.GPSInfo.GPSLongitudeRef"] = "E" if metadata.longitude > 0 else "W"
self.updated_exif["Exif.GPSInfo.GPSLongitude"] = self._to_exif_dms(metadata.longitude)
def add_altitude(self, metadata: PictureMetadata, precision: int = 1000) -> None:
"""
@ -116,24 +98,18 @@ class Writer:
if altitude is not None:
negative_altitude = 0 if altitude >= 0 else 1
self.updated_exif["Exif.GPSInfo.GPSAltitude"] = (
f"{int(abs(altitude * precision))} / {precision}"
)
self.updated_exif["Exif.GPSInfo.GPSAltitudeRef"] = negative_altitude
self.updated_exif['Exif.GPSInfo.GPSAltitude'] = f"{int(abs(altitude * precision))} / {precision}"
self.updated_exif['Exif.GPSInfo.GPSAltitudeRef'] = negative_altitude
def add_direction(
self, metadata: PictureMetadata, ref: str = "T", precision: int = 1000
) -> None:
def add_direction(self, metadata: PictureMetadata, ref: str = 'T', precision: int = 1000) -> None:
"""
Add direction value in GPSImgDirection and GPSImgDirectionRef
"""
direction = metadata.direction
if metadata.direction is not None:
self.updated_exif["Exif.GPSInfo.GPSImgDirection"] = (
f"{int(abs(direction % 360.0 * precision))} / {precision}"
)
self.updated_exif["Exif.GPSInfo.GPSImgDirectionRef"] = ref
self.updated_exif['Exif.GPSInfo.GPSImgDirection'] = f"{int(abs(direction % 360.0 * precision))} / {precision}"
self.updated_exif['Exif.GPSInfo.GPSImgDirectionRef'] = ref
def add_gps_datetime(self, metadata: PictureMetadata) -> None:
"""
@ -144,20 +120,14 @@ class Writer:
metadata.capture_time = self.localize(metadata.capture_time, metadata)
# for capture time, override GPSInfo time and DatetimeOriginal
self.updated_exif["Exif.Photo.DateTimeOriginal"] = (
metadata.capture_time.strftime("%Y:%m:%d %H:%M:%S")
)
self.updated_exif["Exif.Photo.DateTimeOriginal"] = metadata.capture_time.strftime("%Y:%m:%d %H:%M:%S")
offset = metadata.capture_time.utcoffset()
if offset is not None:
self.updated_exif["Exif.Photo.OffsetTimeOriginal"] = self.format_offset(
offset
)
self.updated_exif["Exif.Photo.OffsetTimeOriginal"] = self.format_offset(offset)
utc_dt = metadata.capture_time.astimezone(tz=pytz.UTC)
self.updated_exif["Exif.GPSInfo.GPSDateStamp"] = utc_dt.strftime("%Y:%m:%d")
self.updated_exif["Exif.GPSInfo.GPSTimeStamp"] = utc_dt.strftime(
"%H/1 %M/1 %S/1"
)
self.updated_exif["Exif.GPSInfo.GPSTimeStamp"] = utc_dt.strftime("%H/1 %M/1 %S/1")
def add_datetimeoriginal(self, metadata: PictureMetadata) -> None:
"""
@ -168,18 +138,12 @@ class Writer:
metadata.capture_time = self.localize(metadata.capture_time, metadata)
# for capture time, override DatetimeOriginal and SubSecTimeOriginal
self.updated_exif["Exif.Photo.DateTimeOriginal"] = (
metadata.capture_time.strftime("%Y:%m:%d %H:%M:%S")
)
self.updated_exif["Exif.Photo.DateTimeOriginal"] = metadata.capture_time.strftime("%Y:%m:%d %H:%M:%S")
offset = metadata.capture_time.utcoffset()
if offset is not None:
self.updated_exif["Exif.Photo.OffsetTimeOriginal"] = self.format_offset(
offset
)
self.updated_exif["Exif.Photo.OffsetTimeOriginal"] = self.format_offset(offset)
if metadata.capture_time.microsecond != 0:
self.updated_exif["Exif.Photo.SubSecTimeOriginal"] = (
metadata.capture_time.strftime("%f")
)
self.updated_exif["Exif.Photo.SubSecTimeOriginal"] = metadata.capture_time.strftime("%f")
def add_img_projection(self, metadata: PictureMetadata) -> None:
"""
@ -198,15 +162,15 @@ class Writer:
if metadata.artist is not None:
self.updated_exif["Exif.Image.Artist"] = ascii(metadata.artist).strip("'")
def add_camera_make(self, metadata: PictureMetadata) -> None:
"""
Add camera manufacture in Exif Make tag
"""
if metadata.camera_make is not None:
self.updated_exif["Exif.Image.Make"] = ascii(metadata.camera_make).strip(
"'"
)
self.updated_exif["Exif.Image.Make"] = ascii(metadata.camera_make).strip("'")
def add_camera_model(self, metadata: PictureMetadata) -> None:
"""
@ -214,9 +178,7 @@ class Writer:
"""
if metadata.camera_model is not None:
self.updated_exif["Exif.Image.Model"] = ascii(metadata.camera_model).strip(
"'"
)
self.updated_exif["Exif.Image.Model"] = ascii(metadata.camera_model).strip("'")
def format_offset(self, offset: timedelta) -> str:
"""Format offset for OffsetTimeOriginal. Format is like "+02:00" for paris offset