Source code for hepdata.modules.email.utils

# This file is part of HEPData.
# Copyright (C) 2016 CERN.
#
# HEPData is free software; you can redistribute it
# and/or modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation; either version 2 of the
# License, or (at your option) any later version.
#
# HEPData is distributed in the hope that it will be
# useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with HEPData; if not, write to the
# Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston,
# MA 02111-1307, USA.
#
# In applying this license, CERN does not
# waive the privileges and immunities granted to it by virtue of its status
# as an Intergovernmental Organization or submit itself to any jurisdiction.


"""Provides high-level common email utilities."""

from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from smtplib import SMTP, SMTPRecipientsRefused

from celery import shared_task
from flask import current_app
from flask_celeryext import create_celery_app


[docs]def create_send_email_task(destination, subject, message, reply_to_address=None): """ Schedules a task to send an email. :param destination: :param subject: :param message: :param reply_to_address: :return: send_email """ # this is required for some unknown reason due to an initialisation problem with celery. if not current_app.config.get('TESTING', False): create_celery_app(current_app) print('Sending email to {0}'.format(destination)) send_email.delay(destination, subject, message, reply_to_address) else: print('Not sending email as TESTING=True; would have sent email to {0}:'.format(destination)) import re clean = re.compile('(?s)<.*?>') newlines = re.compile(r'(?ms)(\n(\s*)){2,}') print(re.sub(newlines, '\n', re.sub(clean, '', message)))
[docs]@shared_task def send_email(destination, subject, message, reply_to_address=None): try: connection = connect() mmp_msg = MIMEMultipart('alternative') mmp_msg['Subject'] = subject mmp_msg['From'] = reply_to_address if reply_to_address else current_app.config['MAIL_DEFAULT_SENDER'] mmp_msg['To'] = destination part1 = MIMEText(message, 'html', 'utf-8') mmp_msg.attach(part1) recipients = destination.split(',') recipients.append(current_app.config['ADMIN_EMAIL']) connection.send_message(mmp_msg, current_app.config['MAIL_DEFAULT_SENDER'], recipients) connection.quit() except SMTPRecipientsRefused as smtp_error: send_error_mail(smtp_error) except Exception as e: print('Exception occurred.') raise e
[docs]def send_error_mail(exception): """ Sends an error email to the default system email (which should always be valid!). :param exception: SMTPRecipientsRefused exception """ # get default destination_email = current_app.config['SECURITY_EMAIL_SENDER'] create_send_email_task(destination_email, '[HEPData Error] Error sending email', str(exception))
[docs]def connect(): smtp = SMTP() smtp.connect(current_app.config['MAIL_SERVER'], current_app.config['MAIL_PORT']) if not current_app.config['SMTP_NO_PASSWORD']: if current_app.config['SMTP_ENCRYPTION']: smtp.starttls() smtp.login(current_app.config['MAIL_USERNAME'], current_app.config['MAIL_PASSWORD']) return smtp