Source code for hepdata.modules.records.utils.records_update_utils

"""Update INSPIRE publication information."""

import datetime
import math

from hepdata.modules.records.utils.doi_minter import generate_dois_for_submission
from hepdata.modules.submission.api import get_latest_hepsubmission
from hepdata.modules.submission.models import DataSubmission
from hepdata.modules.records.utils.common import get_record_by_id
from hepdata.modules.records.utils.workflow import update_record
from hepdata.modules.inspire_api.views import get_inspire_record_information
from hepdata.ext.elasticsearch.api import index_record_ids, push_data_keywords
from hepdata.modules.email.api import notify_publication_update
from hepdata.resilient_requests import resilient_requests
from hepdata.config import TESTING

from celery import shared_task
import logging

logging.basicConfig()
log = logging.getLogger(__name__)


RECORDS_PER_PAGE = 1000


[docs]@shared_task def update_record_info(inspire_id, send_email=False): """Update publication information from INSPIRE for a specific record.""" if inspire_id is None: log.error("Inspire ID is None") return 'Inspire ID is None' inspire_id = inspire_id.replace("ins", "") hep_submission = get_latest_hepsubmission(inspire_id=inspire_id) if hep_submission is None: log.warning("Failed to retrieve HEPData submission for Inspire ID {0}".format(inspire_id)) return 'No HEPData submission' publication_recid = hep_submission.publication_recid log.info("Updating recid {} with information from Inspire record {}".format(publication_recid, inspire_id)) updated_inspire_record_information, status = get_inspire_record_information(inspire_id) if status == 'success': # Also need to update publication information for data records. data_submissions = DataSubmission.query.filter_by( publication_recid=publication_recid, version=hep_submission.version ).order_by(DataSubmission.id.asc()) record_ids = [publication_recid] # list of record IDs for data_submission in data_submissions: record_ids.append(data_submission.associated_recid) same_information = {} for index, recid in enumerate(record_ids): if index == 0: updated_record_information = updated_inspire_record_information else: # Only update selected keys for data records. updated_record_information = { key: updated_inspire_record_information[key] for key in ( 'authors', 'creation_date', 'journal_info', 'collaborations' ) } record_information = get_record_by_id(recid) same_information[recid] = True for key, value in updated_record_information.items(): if key not in record_information or record_information[key] != value: log.debug('For recid {}, key {} has new value {}'.format(recid, key, value)) same_information[recid] = False update_record(recid, updated_record_information) break log.info('For recid {}, information needs to be updated: {}'.format(recid, str(not(same_information[recid])))) if all(same for same in same_information.values()): return 'No update needed' else: log.warning("Failed to retrieve publication information for Inspire record {0}".format(inspire_id)) return 'Invalid Inspire ID' if hep_submission.overall_status == 'finished': index_record_ids(record_ids) # index for Elasticsearch push_data_keywords(pub_ids=[recid]) if not TESTING: generate_dois_for_submission.delay(inspire_id=inspire_id) # update metadata stored in DataCite if send_email: record_information = get_record_by_id(publication_recid) notify_publication_update(hep_submission, record_information) # send email to all participants return 'Success'
[docs]@shared_task def update_records_info_since(date): """Update publication information from INSPIRE for all records updated *since* a certain date.""" inspire_ids = get_inspire_records_updated_since(date) for inspire_id in inspire_ids: update_record_info.delay(inspire_id) log.info('Sent task for Inspire ID {}'.format(inspire_id))
[docs]@shared_task def update_records_info_on(date): """Update publication information from INSPIRE for all records updated *on* a certain date.""" inspire_ids = get_inspire_records_updated_on(date) for inspire_id in inspire_ids: update_record_info.delay(inspire_id) log.info('Sent task for Inspire ID {}'.format(inspire_id))
[docs]@shared_task def update_all_records_info(): """Update publication information from INSPIRE for *all* records.""" inspire_ids = get_inspire_records_updated_since('1899-01-01') for inspire_id in inspire_ids: update_record_info.delay(inspire_id) log.info('Sent task for Inspire ID {}'.format(inspire_id))
[docs]def get_inspire_records_updated_since(date): """Returns all inspire records updated since YYYY-MM-DD or #int as number of days since today (1 = yesterday)""" return _get_inspire_records_updated('since', date)
[docs]def get_inspire_records_updated_on(date): """Returns all inspire records updated on YYYY-MM-DD or #int as number of days since today (1 = yesterday).""" return _get_inspire_records_updated('on', date)
def _get_inspire_records_updated(on_or_since, date): """Returns a list of Inspire IDs of records with HEPData modified on or since a certain date.""" specified_time = _get_time(date) log.info("Obtaining Inspire IDs of records updated {} {}.".format(on_or_since, specified_time.strftime('%Y-%m-%d'))) url = _get_url(1, specified_time, on_or_since) response = resilient_requests('get', url) response.raise_for_status() total_hits = response.json()['hits']['total'] total_pages = math.ceil(total_hits / RECORDS_PER_PAGE) log.info("{} records were updated {} {}.".format(total_hits, on_or_since, specified_time.strftime('%Y-%m-%d'))) ids = [] for page in range(1, total_pages + 1): log.debug("At page {}/{}.".format(page, total_pages)) url = _get_url(page, specified_time, on_or_since) response = resilient_requests('get', url) response.raise_for_status() for i, hit in enumerate(response.json()['hits']['hits']): ids += [hit['id']] return ids def _get_time(date): """Returns a datetime object from either a string YYYY-MM-DD or integer (interpreted as number of past days).""" date_is_int = (type(date) is int or type(date) is str and date.isdigit()) if date_is_int: specified_time = datetime.datetime.today() + datetime.timedelta(days=-abs(int(date))) else: specified_time = datetime.datetime.strptime(date, "%Y-%m-%d") return specified_time def _get_url(page, specified_time, on_or_since): """Returns an INSPIRE API query URL for records with HEPData modified on or since a certain date.""" size = RECORDS_PER_PAGE url = ('https://inspirehep.net/api/literature?sort=mostrecent&size={}&page={}&fields=control_number'.format(size, page) + '&q=external_system_identifiers.schema%3AHEPData%20and%20du%3A{}'.format(specified_time.strftime("%Y-%m-%d")) + ('' if on_or_since == 'on' else '%2B')) return url