]> git.k1024.org Git - pyxattr.git/blob - test/test_xattr.py
Test writing large values
[pyxattr.git] / test / test_xattr.py
1 #
2 #
3
4 import sys
5 import unittest
6 import tempfile
7 import os
8 import errno
9
10 import xattr
11 from xattr import NS_USER, XATTR_CREATE, XATTR_REPLACE
12
13 if sys.hexversion >= 0x03000000:
14     PY3K = True
15     EMPTY_NS = bytes()
16 else:
17     PY3K = False
18     EMPTY_NS = ''
19
20 TEST_DIR = os.environ.get("TEST_DIR", ".")
21 TEST_IGNORE_XATTRS = os.environ.get("TEST_IGNORE_XATTRS", "")
22 if TEST_IGNORE_XATTRS == "":
23     TEST_IGNORE_XATTRS = []
24 else:
25     TEST_IGNORE_XATTRS = TEST_IGNORE_XATTRS.split(",")
26     # The following has to be a list comprehension, not a generator, to
27     # avoid weird consequences of lazy evaluation.
28     TEST_IGNORE_XATTRS.extend([a.encode() for a in TEST_IGNORE_XATTRS])
29
30 class xattrTest(unittest.TestCase):
31     USER_NN = "test"
32     USER_ATTR = NS_USER.decode() + "." + USER_NN
33     USER_VAL = "abc"
34     EMPTY_VAL = ""
35     LARGE_VAL = "x" * 2048
36     MANYOPS_COUNT = 131072
37
38     if PY3K:
39         USER_NN = USER_NN.encode()
40         USER_VAL = USER_VAL.encode()
41         USER_ATTR = USER_ATTR.encode()
42         EMPTY_VAL = EMPTY_VAL.encode()
43         LARGE_VAL = LARGE_VAL.encode()
44
45     @staticmethod
46     def _ignore_tuples(attrs):
47         """Remove ignored attributes from the output of xattr.get_all."""
48         return [attr for attr in attrs
49                 if attr[0] not in TEST_IGNORE_XATTRS]
50
51     @staticmethod
52     def _ignore(attrs):
53         """Remove ignored attributes from the output of xattr.list"""
54         return [attr for attr in attrs
55                 if attr not in TEST_IGNORE_XATTRS]
56
57     def checkList(self, attrs, value):
58         """Helper to check list equivalence, skipping TEST_IGNORE_XATTRS."""
59         self.assertEqual(self._ignore(attrs), value)
60
61     def checkTuples(self, attrs, value):
62         """Helper to check list equivalence, skipping TEST_IGNORE_XATTRS."""
63         self.assertEqual(self._ignore_tuples(attrs), value)
64
65     def setUp(self):
66         """set up function"""
67         self.rmfiles = []
68         self.rmdirs = []
69
70     def tearDown(self):
71         """tear down function"""
72         for fname in self.rmfiles:
73             try:
74                 os.unlink(fname)
75             except EnvironmentError:
76                 continue
77         for dname in self.rmdirs:
78             try:
79                 os.rmdir(dname)
80             except EnvironmentError:
81                 continue
82
83     def _getfile(self):
84         """create a temp file"""
85         fh, fname = tempfile.mkstemp(".test", "xattr-", TEST_DIR)
86         self.rmfiles.append(fname)
87         return fh, fname
88
89     def _getdir(self):
90         """create a temp dir"""
91         dname = tempfile.mkdtemp(".test", "xattr-", TEST_DIR)
92         self.rmdirs.append(dname)
93         return dname
94
95     def _getsymlink(self, dangling=True):
96         """create a symlink"""
97         fh, fname = self._getfile()
98         os.close(fh)
99         if dangling:
100             os.unlink(fname)
101         sname = fname + ".symlink"
102         os.symlink(fname, sname)
103         self.rmfiles.append(sname)
104         return fname, sname
105
106     def _checkDeprecated(self, item, symlink=False):
107         """check deprecated list, set, get operations against an item"""
108         self.checkList(xattr.listxattr(item, symlink), [])
109         self.assertRaises(EnvironmentError, xattr.setxattr, item,
110                           self.USER_ATTR, self.USER_VAL,
111                           XATTR_REPLACE)
112         try:
113             xattr.setxattr(item, self.USER_ATTR, self.USER_VAL, 0, symlink)
114         except IOError:
115             err = sys.exc_info()[1]
116             if symlink and (err.errno == errno.EPERM or
117                             err.errno == errno.ENOENT):
118                 # symlinks may fail, in which case we abort the rest
119                 # of the test for this case (Linux returns EPERM; OS X
120                 # returns ENOENT)
121                 return
122             raise
123         self.assertRaises(EnvironmentError, xattr.setxattr, item,
124                           self.USER_ATTR, self.USER_VAL, XATTR_CREATE)
125         self.checkList(xattr.listxattr(item, symlink), [self.USER_ATTR])
126         self.assertEqual(xattr.getxattr(item, self.USER_ATTR, symlink),
127                          self.USER_VAL)
128         self.checkTuples(xattr.get_all(item, nofollow=symlink),
129                           [(self.USER_ATTR, self.USER_VAL)])
130         xattr.removexattr(item, self.USER_ATTR)
131         self.checkList(xattr.listxattr(item, symlink), [])
132         self.checkTuples(xattr.get_all(item, nofollow=symlink),
133                          [])
134         self.assertRaises(EnvironmentError, xattr.removexattr,
135                           item, self.USER_ATTR)
136
137     def _checkListSetGet(self, item, symlink=False, use_ns=False):
138         """check list, set, get operations against an item"""
139         self.checkList(xattr.list(item, symlink), [])
140         self.assertRaises(EnvironmentError, xattr.set, item,
141                           self.USER_ATTR, self.USER_VAL, flags=XATTR_REPLACE)
142         self.assertRaises(EnvironmentError, xattr.set, item,
143                           self.USER_NN, self.USER_VAL, flags=XATTR_REPLACE,
144                           namespace=NS_USER)
145         try:
146             if use_ns:
147                 xattr.set(item, self.USER_NN, self.USER_VAL,
148                           namespace=NS_USER,
149                           nofollow=symlink)
150             else:
151                 xattr.set(item, self.USER_ATTR, self.USER_VAL,
152                           nofollow=symlink)
153         except IOError:
154             err = sys.exc_info()[1]
155             if symlink and (err.errno == errno.EPERM or
156                             err.errno == errno.ENOENT):
157                 # symlinks may fail, in which case we abort the rest
158                 # of the test for this case (Linux returns EPERM; OS X
159                 # returns ENOENT)
160                 return
161             raise
162         self.assertRaises(EnvironmentError, xattr.set, item,
163                           self.USER_ATTR, self.USER_VAL, flags=XATTR_CREATE)
164         self.assertRaises(EnvironmentError, xattr.set, item,
165                           self.USER_NN, self.USER_VAL,
166                           flags=XATTR_CREATE, namespace=NS_USER)
167         self.checkList(xattr.list(item, nofollow=symlink), [self.USER_ATTR])
168         self.checkList(xattr.list(item, nofollow=symlink,
169                                    namespace=EMPTY_NS),
170                          [self.USER_ATTR])
171         self.assertEqual(xattr.list(item, namespace=NS_USER, nofollow=symlink),
172                          [self.USER_NN])
173         self.assertEqual(xattr.get(item, self.USER_ATTR, nofollow=symlink),
174                          self.USER_VAL)
175         self.assertEqual(xattr.get(item, self.USER_NN, nofollow=symlink,
176                                    namespace=NS_USER), self.USER_VAL)
177         self.checkTuples(xattr.get_all(item, nofollow=symlink),
178                          [(self.USER_ATTR, self.USER_VAL)])
179         self.assertEqual(xattr.get_all(item, nofollow=symlink,
180                                        namespace=NS_USER),
181                          [(self.USER_NN, self.USER_VAL)])
182         if use_ns:
183             xattr.remove(item, self.USER_NN, namespace=NS_USER)
184         else:
185             xattr.remove(item, self.USER_ATTR)
186         self.checkList(xattr.list(item, symlink), [])
187         self.checkTuples(xattr.get_all(item, nofollow=symlink),
188                          [])
189         self.assertRaises(EnvironmentError, xattr.remove,
190                           item, self.USER_ATTR, nofollow=symlink)
191         self.assertRaises(EnvironmentError, xattr.remove, item,
192                           self.USER_NN, namespace=NS_USER, nofollow=symlink)
193
194     def testNoXattrDeprecated(self):
195         """test no attributes (deprecated functions)"""
196         fh, fname = self._getfile()
197         self.checkList(xattr.listxattr(fname), [])
198         self.checkTuples(xattr.get_all(fname), [])
199         self.assertRaises(EnvironmentError, xattr.getxattr, fname,
200                               self.USER_ATTR)
201         dname = self._getdir()
202         self.checkList(xattr.listxattr(dname), [])
203         self.checkTuples(xattr.get_all(dname), [])
204         self.assertRaises(EnvironmentError, xattr.getxattr, dname,
205                               self.USER_ATTR)
206         _, sname = self._getsymlink()
207         self.checkList(xattr.listxattr(sname, True), [])
208         self.checkTuples(xattr.get_all(sname, nofollow=True), [])
209         self.assertRaises(EnvironmentError, xattr.getxattr, fname,
210                               self.USER_ATTR, True)
211
212
213     def testNoXattr(self):
214         """test no attributes"""
215         fh, fname = self._getfile()
216         self.checkList(xattr.list(fname), [])
217         self.assertEqual(xattr.list(fname, namespace=NS_USER), [])
218         self.checkTuples(xattr.get_all(fname), [])
219         self.assertEqual(xattr.get_all(fname, namespace=NS_USER), [])
220         self.assertRaises(EnvironmentError, xattr.get, fname,
221                               self.USER_NN, namespace=NS_USER)
222         dname = self._getdir()
223         self.checkList(xattr.list(dname), [])
224         self.assertEqual(xattr.list(dname, namespace=NS_USER), [])
225         self.checkTuples(xattr.get_all(dname), [])
226         self.assertEqual(xattr.get_all(dname, namespace=NS_USER), [])
227         self.assertRaises(EnvironmentError, xattr.get, dname,
228                               self.USER_NN, namespace=NS_USER)
229         _, sname = self._getsymlink()
230         self.checkList(xattr.list(sname, nofollow=True), [])
231         self.assertEqual(xattr.list(sname, nofollow=True,
232                                     namespace=NS_USER), [])
233         self.checkTuples(xattr.get_all(sname, nofollow=True), [])
234         self.assertEqual(xattr.get_all(sname, nofollow=True,
235                                            namespace=NS_USER), [])
236         self.assertRaises(EnvironmentError, xattr.get, sname,
237                               self.USER_NN, namespace=NS_USER, nofollow=True)
238
239     def testFileByNameDeprecated(self):
240         """test set and retrieve one attribute by file name (deprecated)"""
241         fh, fname = self._getfile()
242         self._checkDeprecated(fname)
243         os.close(fh)
244
245     def testFileByName(self):
246         """test set and retrieve one attribute by file name"""
247         fh, fname = self._getfile()
248         self._checkListSetGet(fname)
249         self._checkListSetGet(fname, use_ns=True)
250         os.close(fh)
251
252     def testFileByDescriptorDeprecated(self):
253         """test file descriptor operations (deprecated functions)"""
254         fh, fname = self._getfile()
255         self._checkDeprecated(fh)
256         os.close(fh)
257
258     def testFileByDescriptor(self):
259         """test file descriptor operations"""
260         fh, fname = self._getfile()
261         self._checkListSetGet(fh)
262         self._checkListSetGet(fh, use_ns=True)
263         os.close(fh)
264
265     def testFileByObjectDeprecated(self):
266         """test file descriptor operations (deprecated functions)"""
267         fh, fname = self._getfile()
268         fo = os.fdopen(fh)
269         self._checkDeprecated(fo)
270         fo.close()
271
272     def testFileByObject(self):
273         """test file descriptor operations"""
274         fh, fname = self._getfile()
275         fo = os.fdopen(fh)
276         self._checkListSetGet(fo)
277         self._checkListSetGet(fo, use_ns=True)
278         fo.close()
279
280     def testMixedAccessDeprecated(self):
281         """test mixed access to file (deprecated functions)"""
282         fh, fname = self._getfile()
283         fo = os.fdopen(fh)
284         self.checkList(xattr.listxattr(fname), [])
285         xattr.setxattr(fname, self.USER_ATTR, self.USER_VAL)
286         self.checkList(xattr.listxattr(fh), [self.USER_ATTR])
287         self.assertEqual(xattr.getxattr(fo, self.USER_ATTR), self.USER_VAL)
288         self.checkTuples(xattr.get_all(fo), [(self.USER_ATTR, self.USER_VAL)])
289         self.checkTuples(xattr.get_all(fname),
290                          [(self.USER_ATTR, self.USER_VAL)])
291         fo.close()
292
293     def testMixedAccess(self):
294         """test mixed access to file"""
295         fh, fname = self._getfile()
296         fo = os.fdopen(fh)
297         self.checkList(xattr.list(fname), [])
298         xattr.set(fname, self.USER_ATTR, self.USER_VAL)
299         self.checkList(xattr.list(fh), [self.USER_ATTR])
300         self.assertEqual(xattr.list(fh, namespace=NS_USER), [self.USER_NN])
301         self.assertEqual(xattr.get(fo, self.USER_ATTR), self.USER_VAL)
302         self.assertEqual(xattr.get(fo, self.USER_NN, namespace=NS_USER),
303                          self.USER_VAL)
304         self.checkTuples(xattr.get_all(fo),
305                          [(self.USER_ATTR, self.USER_VAL)])
306         self.assertEqual(xattr.get_all(fo, namespace=NS_USER),
307                          [(self.USER_NN, self.USER_VAL)])
308         self.checkTuples(xattr.get_all(fname),
309                          [(self.USER_ATTR, self.USER_VAL)])
310         self.assertEqual(xattr.get_all(fname, namespace=NS_USER),
311                          [(self.USER_NN, self.USER_VAL)])
312         fo.close()
313
314     def testDirOpsDeprecated(self):
315         """test attribute setting on directories (deprecated functions)"""
316         dname = self._getdir()
317         self._checkDeprecated(dname)
318
319     def testDirOps(self):
320         """test attribute setting on directories"""
321         dname = self._getdir()
322         self._checkListSetGet(dname)
323         self._checkListSetGet(dname, use_ns=True)
324
325     def testSymlinkOpsDeprecated(self):
326         """test symlink operations (deprecated functions)"""
327         _, sname = self._getsymlink()
328         self.assertRaises(EnvironmentError, xattr.listxattr, sname)
329         self._checkDeprecated(sname, symlink=True)
330         target, sname = self._getsymlink(dangling=False)
331         xattr.setxattr(target, self.USER_ATTR, self.USER_VAL)
332         self.checkList(xattr.listxattr(target), [self.USER_ATTR])
333         self.checkList(xattr.listxattr(sname, True), [])
334         self.assertRaises(EnvironmentError, xattr.removexattr, sname,
335                           self.USER_ATTR, True)
336         xattr.removexattr(sname, self.USER_ATTR, False)
337
338     def testSymlinkOps(self):
339         """test symlink operations"""
340         _, sname = self._getsymlink()
341         self.assertRaises(EnvironmentError, xattr.list, sname)
342         self._checkListSetGet(sname, symlink=True)
343         self._checkListSetGet(sname, symlink=True, use_ns=True)
344         target, sname = self._getsymlink(dangling=False)
345         xattr.set(target, self.USER_ATTR, self.USER_VAL)
346         self.checkList(xattr.list(target), [self.USER_ATTR])
347         self.checkList(xattr.list(sname, nofollow=True), [])
348         self.assertRaises(EnvironmentError, xattr.remove, sname,
349                           self.USER_ATTR, nofollow=True)
350         xattr.remove(sname, self.USER_ATTR, nofollow=False)
351
352     def testBinaryPayloadDeprecated(self):
353         """test binary values (deprecated functions)"""
354         fh, fname = self._getfile()
355         os.close(fh)
356         BINVAL = "abc" + '\0' + "def"
357         if PY3K:
358             BINVAL = BINVAL.encode()
359         xattr.setxattr(fname, self.USER_ATTR, BINVAL)
360         self.checkList(xattr.listxattr(fname), [self.USER_ATTR])
361         self.assertEqual(xattr.getxattr(fname, self.USER_ATTR), BINVAL)
362         self.checkTuples(xattr.get_all(fname), [(self.USER_ATTR, BINVAL)])
363         xattr.removexattr(fname, self.USER_ATTR)
364
365     def testBinaryPayload(self):
366         """test binary values"""
367         fh, fname = self._getfile()
368         os.close(fh)
369         BINVAL = "abc" + '\0' + "def"
370         if PY3K:
371             BINVAL = BINVAL.encode()
372         xattr.set(fname, self.USER_ATTR, BINVAL)
373         self.checkList(xattr.list(fname), [self.USER_ATTR])
374         self.assertEqual(xattr.list(fname, namespace=NS_USER), [self.USER_NN])
375         self.assertEqual(xattr.get(fname, self.USER_ATTR), BINVAL)
376         self.assertEqual(xattr.get(fname, self.USER_NN,
377                                    namespace=NS_USER), BINVAL)
378         self.checkTuples(xattr.get_all(fname), [(self.USER_ATTR, BINVAL)])
379         self.assertEqual(xattr.get_all(fname, namespace=NS_USER),
380                          [(self.USER_NN, BINVAL)])
381         xattr.remove(fname, self.USER_ATTR)
382
383     def testManyOpsDeprecated(self):
384         """test many ops (deprecated functions)"""
385         fh, fname = self._getfile()
386         xattr.setxattr(fh, self.USER_ATTR, self.USER_VAL)
387         VL = [self.USER_ATTR]
388         for i in range(self.MANYOPS_COUNT):
389             self.checkList(xattr.listxattr(fh), VL)
390         for i in range(self.MANYOPS_COUNT):
391             self.assertEqual(xattr.getxattr(fh, self.USER_ATTR), self.USER_VAL)
392         for i in range(self.MANYOPS_COUNT):
393             self.checkTuples(xattr.get_all(fh),
394                              [(self.USER_ATTR, self.USER_VAL)])
395
396     def testManyOps(self):
397         """test many ops"""
398         fh, fname = self._getfile()
399         xattr.set(fh, self.USER_ATTR, self.USER_VAL)
400         VL = [self.USER_ATTR]
401         VN = [self.USER_NN]
402         for i in range(self.MANYOPS_COUNT):
403             self.checkList(xattr.list(fh), VL)
404             self.checkList(xattr.list(fh, namespace=EMPTY_NS), VL)
405             self.assertEqual(xattr.list(fh, namespace=NS_USER), VN)
406         for i in range(self.MANYOPS_COUNT):
407             self.assertEqual(xattr.get(fh, self.USER_ATTR), self.USER_VAL)
408             self.assertEqual(xattr.get(fh, self.USER_NN, namespace=NS_USER),
409                              self.USER_VAL)
410         for i in range(self.MANYOPS_COUNT):
411             self.checkTuples(xattr.get_all(fh),
412                              [(self.USER_ATTR, self.USER_VAL)])
413             self.assertEqual(xattr.get_all(fh, namespace=NS_USER),
414                              [(self.USER_NN, self.USER_VAL)])
415
416     def testNoneNamespace(self):
417         fh, fname = self._getfile()
418         self.assertRaises(TypeError, xattr.get, fh, self.USER_ATTR,
419                           namespace=None)
420
421     def testEmptyValue(self):
422         fh, fname = self._getfile()
423         xattr.set(fh, self.USER_ATTR, self.EMPTY_VAL)
424         self.assertEqual(xattr.get(fh, self.USER_ATTR), self.EMPTY_VAL)
425
426     def testWrongType(self):
427         self.assertRaises(TypeError, xattr.get, object(), self.USER_ATTR)
428
429     def testLargeAttribute(self):
430         fh, fname = self._getfile()
431
432         xattr.set(fh, self.USER_ATTR, self.LARGE_VAL)
433         self.assertEqual(xattr.get(fh, self.USER_ATTR), self.LARGE_VAL)
434
435
436 if __name__ == "__main__":
437     unittest.main()