1
#!/usr/bin/env python3
2

3
# Contest Management System - http://cms-dev.github.io/
4
# Copyright © 2016 Peyman Jabbarzade Ganje <peyman.jabarzade@gmail.com>
5
#
6
# This program is free software: you can redistribute it and/or modify
7
# it under the terms of the GNU Affero General Public License as
8
# published by the Free Software Foundation, either version 3 of the
9
# License, or (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
# GNU Affero General Public License for more details.
15
#
16
# You should have received a copy of the GNU Affero General Public License
17
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
18

19 1
import logging
20 1
import zipfile
21

22 1
from cms.db import Testcase
23

24

25 1
logger = logging.getLogger(__name__)
26

27

28 1
def import_testcases_from_zipfile(
29
        session, file_cacher, dataset,
30
        archive, input_re, output_re, overwrite, public):
31
    """Import testcases from a zipped archive
32

33
    session (Session): session to use to add the testcases.
34
    file_cacher (FileCacher): interface to access the files in the DB.
35
    dataset (Dataset): dataset where to add the testcases.
36
    archive (File): file-like object representing a zip file.
37
    input_re (_sre.SRE_Pattern): regular expression matching the input
38
        filenames (e.g., re.compile(r"input_(.*).txt)).
39
    output_re (_sre.SRE_Pattern): regular expression matching the output
40
        filenames (e.g., re.compile(r"output_(.*).txt)).
41
    overwrite (bool): whether to overwrite existing testcases.
42
    public (bool): whether to mark the new testcases as public.
43

44
    return ((unicode, unicode)): subject and text of a message describing
45
        the outcome of the operation.
46

47
    """
48 0
    task_name = dataset.task.name
49 0
    try:
50 0
        with zipfile.ZipFile(archive, "r") as archive_zfp:
51 0
            tests = dict()
52
            # Match input/output file names to testcases' codenames.
53 0
            for filename in archive_zfp.namelist():
54
                # In py2, filename is either str (if ASCII) or unicode.
55
                # Cast it to a consistent type, compatible with py3.
56 0
                filename = str(filename)
57 0
                match = input_re.match(filename)
58 0
                if match:
59 0
                    codename = match.group(1)
60 0
                    if codename not in tests:
61 0
                        tests[codename] = [None, None]
62 0
                    tests[codename][0] = filename
63
                else:
64 0
                    match = output_re.match(filename)
65 0
                    if match:
66 0
                        codename = match.group(1)
67 0
                        if codename not in tests:
68 0
                            tests[codename] = [None, None]
69 0
                        tests[codename][1] = filename
70

71 0
            skipped_tc = []
72 0
            overwritten_tc = []
73 0
            added_tc = []
74 0
            for codename, testdata in tests.items():
75
                # If input or output file isn't found, skip it.
76 0
                if not testdata[0] or not testdata[1]:
77 0
                    continue
78

79
                # Check, whether current testcase already exists.
80 0
                if codename in dataset.testcases:
81
                    # If we are allowed, remove existing testcase.
82
                    # If not - skip this testcase.
83 0
                    if overwrite:
84 0
                        testcase = dataset.testcases[codename]
85 0
                        session.delete(testcase)
86 0
                        try:
87 0
                            session.commit()
88 0
                        except Exception:
89 0
                            skipped_tc.append(codename)
90 0
                            continue
91 0
                        overwritten_tc.append(codename)
92
                    else:
93 0
                        skipped_tc.append(codename)
94 0
                        continue
95

96
                # Add current testcase.
97 0
                try:
98 0
                    input_ = archive_zfp.read(testdata[0])
99 0
                    output = archive_zfp.read(testdata[1])
100 0
                except Exception:
101 0
                    raise Exception("Reading testcase %s failed" % codename)
102 0
                try:
103 0
                    input_digest = file_cacher.put_file_content(
104
                        input_, "Testcase input for task %s" % task_name)
105 0
                    output_digest = file_cacher.put_file_content(
106
                        output, "Testcase output for task %s" % task_name)
107 0
                except Exception:
108 0
                    raise Exception("Testcase storage failed")
109

110 0
                testcase = Testcase(codename, public, input_digest,
111
                                    output_digest, dataset=dataset)
112 0
                session.add(testcase)
113 0
                try:
114 0
                    session.commit()
115 0
                except Exception:
116 0
                    raise Exception("Couldn't add test %s" % codename)
117 0
                if codename not in overwritten_tc:
118 0
                    added_tc.append(codename)
119 0
    except zipfile.BadZipfile:
120 0
        raise Exception(
121
            "The selected file is not a zip file. "
122
            "Please select a valid zip file.")
123

124 0
    return (
125
        "Successfully added %d and overwritten %d testcase(s)" %
126
        (len(added_tc), len(overwritten_tc)),
127
        "Added: %s; overwritten: %s; skipped: %s" %
128
        (", ".join(added_tc) if added_tc else "none",
129
         ", ".join(overwritten_tc) if overwritten_tc else "none",
130
         ", ".join(skipped_tc) if skipped_tc else "none"))

Read our documentation on viewing source code .

Loading