1 |
schorsch |
1.1 |
# -*- coding: utf-8 -*- |
2 |
|
|
from __future__ import division, print_function, unicode_literals |
3 |
|
|
|
4 |
|
|
import struct |
5 |
|
|
import operator |
6 |
|
|
import unittest |
7 |
|
|
from functools import reduce |
8 |
|
|
|
9 |
|
|
import testsupport as ts |
10 |
|
|
from pyradlib import lcompare |
11 |
|
|
from pyradlib.pyrad_proc import PIPE, Error, ProcMixin |
12 |
|
|
|
13 |
|
|
|
14 |
|
|
class PyradprocTestCase(unittest.TestCase, ProcMixin): |
15 |
|
|
|
16 |
|
|
def _runit(self, cmdl, actstrl, _in=None, indata=None, nl=False): |
17 |
|
|
try: |
18 |
|
|
procs = [] |
19 |
|
|
if isinstance(cmdl[0], (type(b''), type(u''))): |
20 |
|
|
proc = self.call_one(cmdl, actstrl, _in=_in, out=PIPE, |
21 |
|
|
universal_newlines=nl) |
22 |
|
|
procs = [proc] |
23 |
|
|
elif len(cmdl) == 2: |
24 |
|
|
procs = self.call_two(cmdl[0], cmdl[1], actstrl[0], actstrl[1], |
25 |
|
|
_in=_in, out=PIPE, universal_newlines=nl) |
26 |
|
|
else: |
27 |
|
|
procs = self.call_many(cmdl, actstrl, _in=_in, out=PIPE, |
28 |
|
|
universal_newlines=nl) |
29 |
|
|
if indata: # only for small amounts of data |
30 |
|
|
procs[0].stdin.write(indata) |
31 |
|
|
procs[0].stdin.close() |
32 |
|
|
res = procs[-1].stdout.read() |
33 |
|
|
else: |
34 |
|
|
res, eres = procs[-1].communicate() |
35 |
|
|
if eres: print('Errors', eres) # XXX |
36 |
|
|
except Error as e: |
37 |
|
|
raise |
38 |
|
|
self.fail('%s [%s]' % (str(e), self.qjoin(cmdl))) |
39 |
|
|
finally: |
40 |
|
|
for proc in procs: |
41 |
|
|
proc.wait() |
42 |
|
|
if nl: return filter(None, lcompare.split_rad(res)) |
43 |
|
|
return res |
44 |
|
|
|
45 |
|
|
|
46 |
|
|
def test_call_1_text(self): |
47 |
|
|
for n in (5, 10, 55, 200,1328,1329,1330,1331,1332, 2000, 5000,99999): |
48 |
|
|
exp = range(n) |
49 |
|
|
cntcmd = ['cnt', str(n)] |
50 |
|
|
res = self._runit(cntcmd, 'run one process', nl=True) |
51 |
|
|
resl = [r[0] for r in res] |
52 |
|
|
try: lcompare.lcompare(resl, exp) |
53 |
|
|
except lcompare.error as e: |
54 |
|
|
print(resl, exp) |
55 |
|
|
self.fail(('call_one_text n=%d -- ' % n) +str(e)) |
56 |
|
|
|
57 |
|
|
def test_call_1_bin(self): |
58 |
|
|
exp = (2160, 8640, 4320, 1080, 7560) |
59 |
|
|
cntcmd = ['total', '-of'] |
60 |
|
|
tfn = ts.datafile('histo.dat') |
61 |
|
|
res = self._runit(cntcmd, 'run one process', _in=tfn, nl=False) |
62 |
|
|
resl = struct.unpack('f'*len(exp), res) |
63 |
|
|
try: lcompare.lcompare(resl, exp) |
64 |
|
|
except lcompare.error as e: |
65 |
|
|
self.fail(('call_one_text n=%d -- ' % n) +str(e)) |
66 |
|
|
|
67 |
|
|
def test_call_1_noexe(self): |
68 |
|
|
# let's hope a program with that name doesn't exist |
69 |
|
|
cmd = ['iswaxubdzkcgrhmltqvyfepojn', '-x', '-y', '-z'] |
70 |
|
|
self.assertRaises(Error, self._runit, cmd, 'run one process') |
71 |
|
|
|
72 |
|
|
def test_call_2_text(self): |
73 |
|
|
# with values higher than 44721, total will return a float in e-format. |
74 |
|
|
for n in (5, 10, 55, 200,1328,1329,1330,1331,1332, 2000, 5000, |
75 |
|
|
44721, 44722, 99999): |
76 |
|
|
exp = [reduce(operator.add, range(n))] |
77 |
|
|
cntcmd = ['cnt', str(n)] |
78 |
|
|
totcmd = ['total'] |
79 |
|
|
resl = self._runit([cntcmd, totcmd], ['run p1','run p2'], nl=True) |
80 |
|
|
try: lcompare.lcompare(resl, [exp]) |
81 |
|
|
except lcompare.error as e: |
82 |
|
|
self.fail(('call_one_text n=%d -- ' % n) +str(e)) |
83 |
|
|
|
84 |
|
|
def test_call_2_bin(self): |
85 |
|
|
for nn in ((1,1,1), (5,5,5), (3,9,5,2,8)): |
86 |
|
|
totalrows = reduce(operator.mul, nn) |
87 |
|
|
exp = [reduce(operator.add, range(n))*(totalrows/n) for n in nn] |
88 |
|
|
cntcmd = ['cnt'] + [str(n) for n in nn] |
89 |
|
|
totcmd = ['total', '-of'] |
90 |
|
|
res = self._runit([cntcmd, totcmd], ['run p1','run p2']) |
91 |
|
|
resl = struct.unpack('f'*len(exp), res) |
92 |
|
|
try: lcompare.lcompare(resl, exp) |
93 |
|
|
except lcompare.error as e: |
94 |
|
|
self.fail(('call_one_text n=%d -- ' % n) +str(e)) |
95 |
|
|
|
96 |
|
|
def test_call_many_text(self): |
97 |
|
|
data = b'''void polygon test 0 0 9 0 0 0 1 0 0 0 1 0''' |
98 |
|
|
exp = (['#', 'xform', '-s', 0.5], |
99 |
|
|
['#', 'xform', '-t', 0, 0, 2], |
100 |
|
|
['#', 'xform', '-ry', -20], |
101 |
|
|
['#', 'xform', '-rx', 30], |
102 |
|
|
['void', 'polygon', 'test'], |
103 |
|
|
[0], [0], [9], |
104 |
|
|
[0, 0, 1], |
105 |
|
|
[0.469846310393, 0, 1.17101007167], |
106 |
|
|
[-0.0855050358315, 0.433012701892, 1.23492315519]) |
107 |
|
|
cmd0 = ['xform', '-rx', '30'] |
108 |
|
|
cmd1 = ['xform', '-ry', '-20'] |
109 |
|
|
cmd2 = ['xform', '-t', '0', '0', '2'] |
110 |
|
|
cmd3 = ['xform', '-s', '0.5'] |
111 |
|
|
cmdl = [cmd0, cmd1, cmd2, cmd3] |
112 |
|
|
resl = self._runit(cmdl, 'run many processes', indata=data, |
113 |
|
|
_in=PIPE, nl=True) |
114 |
|
|
try: lcompare.lcompare(resl, exp) |
115 |
|
|
except lcompare.error as e: |
116 |
|
|
print(resl, exp) |
117 |
|
|
self.fail(('call_one_text n=%d -- ' % n) +str(e)) |
118 |
|
|
|
119 |
|
|
def test_call_many_bin(self): |
120 |
|
|
exp = (15, 40, 45, 48, 0) |
121 |
|
|
cmd0 = ['cnt', '3', '4', '5', '1'] |
122 |
|
|
cmd1 = ['histo', '1', '5'] |
123 |
|
|
cmd2 = ['total', '-of'] |
124 |
|
|
cmdl = [cmd0, cmd1, cmd2] |
125 |
|
|
res = self._runit(cmdl, 'run many processes') |
126 |
|
|
resl = struct.unpack('f'*len(exp), res) |
127 |
|
|
try: lcompare.lcompare(resl, exp) |
128 |
|
|
except lcompare.error as e: |
129 |
|
|
print(resl, exp) |
130 |
|
|
self.fail(('call_one_text -- ') +str(e)) |
131 |
|
|
|
132 |
|
|
|
133 |
|
|
# vi: set ts=4 sw=4 : |