|
30 | 30 | MAX_ERROR_COUNT = 10 |
31 | 31 |
|
32 | 32 | TMP_TABLE_JOIN_LIMIT = 100 |
| 33 | +# Max number of LFNs in the memory table |
| 34 | +GET_FILE_ID_BULK_SIZE = 30_000 |
33 | 35 | ############################################################################# |
34 | 36 |
|
35 | 37 |
|
@@ -1324,27 +1326,36 @@ def __getFileIDsForLfns(self, lfns, connection=False): |
1324 | 1326 |
|
1325 | 1327 | if not lfns: |
1326 | 1328 | return ({}, {}) |
1327 | | - # Create temporary table for LFNs |
1328 | | - sqlCmd = "CREATE TEMPORARY TABLE to_query_LFNs (LFN VARCHAR(255) NOT NULL, PRIMARY KEY (LFN)) ENGINE=MEMORY;" |
1329 | | - returnValueOrRaise(self._update(sqlCmd, conn=connection)) |
1330 | 1329 |
|
1331 | | - try: |
1332 | | - # Insert LFNs into temporary table |
1333 | | - sqlCmd = "INSERT INTO to_query_LFNs (LFN) VALUES ( %s )" |
1334 | | - returnValueOrRaise(self._updatemany(sqlCmd, [(lfn,) for lfn in lfns], conn=connection)) |
| 1330 | + lfns_to_ids = {} |
| 1331 | + ids_to_lfns = {} |
1335 | 1332 |
|
1336 | | - # Query using JOIN with temporary table |
1337 | | - req = "SELECT df.LFN, df.FileID FROM DataFiles df JOIN to_query_LFNs t ON df.LFN = t.LFN;" |
1338 | | - res = returnValueOrRaise(self._query(req, conn=connection)) |
| 1333 | + for lfn_bulk in breakListIntoChunks(lfns, GET_FILE_ID_BULK_SIZE): |
| 1334 | + # Create temporary table for LFNs |
| 1335 | + sqlCmd = ( |
| 1336 | + "CREATE TEMPORARY TABLE to_query_LFNs (LFN VARCHAR(255) NOT NULL, PRIMARY KEY (LFN)) ENGINE=MEMORY;" |
| 1337 | + ) |
| 1338 | + returnValueOrRaise(self._update(sqlCmd, conn=connection)) |
1339 | 1339 |
|
1340 | | - lfns = dict(res) |
1341 | | - # Reverse dictionary |
1342 | | - fids = {fileID: lfn for lfn, fileID in lfns.items()} |
1343 | | - return (fids, lfns) |
1344 | | - finally: |
1345 | | - # Clean up temporary table |
1346 | | - sqlCmd = "DROP TEMPORARY TABLE to_query_LFNs" |
1347 | | - self._update(sqlCmd, conn=connection) |
| 1340 | + try: |
| 1341 | + # Insert LFNs into temporary table |
| 1342 | + sqlCmd = "INSERT INTO to_query_LFNs (LFN) VALUES ( %s )" |
| 1343 | + returnValueOrRaise(self._updatemany(sqlCmd, [(lfn,) for lfn in lfn_bulk], conn=connection)) |
| 1344 | + |
| 1345 | + # Query using JOIN with temporary table |
| 1346 | + req = "SELECT df.LFN, df.FileID FROM DataFiles df JOIN to_query_LFNs t ON df.LFN = t.LFN;" |
| 1347 | + res = returnValueOrRaise(self._query(req, conn=connection)) |
| 1348 | + |
| 1349 | + lfns_to_ids.update(res) |
| 1350 | + # Reverse dictionary |
| 1351 | + |
| 1352 | + finally: |
| 1353 | + # Clean up temporary table |
| 1354 | + sqlCmd = "DROP TEMPORARY TABLE to_query_LFNs" |
| 1355 | + self._update(sqlCmd, conn=connection) |
| 1356 | + |
| 1357 | + ids_to_lfns = {fileID: lfn for lfn, fileID in lfns_to_ids.items()} |
| 1358 | + return (ids_to_lfns, lfns_to_ids) |
1348 | 1359 |
|
1349 | 1360 | def __getLfnsForFileIDs(self, fileIDs, connection=False): |
1350 | 1361 | """Get lfns for the given list of fileIDs""" |
|
0 commit comments