Skip to content

Commit

Permalink
Fixes for Python 3.12
Browse files Browse the repository at this point in the history
  • Loading branch information
doko42 authored and v4hn committed Jul 4, 2024
1 parent 32f2980 commit d5a1b6d
Show file tree
Hide file tree
Showing 94 changed files with 3,122 additions and 2,977 deletions.
86 changes: 43 additions & 43 deletions test/test_rosbag/test/test_bag.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def test_opening_stream_works(self):

f = open('/tmp/test_opening_stream_works.bag', 'rb')
b = rosbag.Bag(f, 'r')
self.assert_(len(list(b.read_messages())) == 10)
self.assertTrue(len(list(b.read_messages())) == 10)
b.close()

def test_invalid_bag_arguments_fails(self):
Expand All @@ -77,20 +77,20 @@ def fn3(): rosbag.Bag(f, 'z')
def fn4(): rosbag.Bag(f, 'r', compression='foobar')
def fn5(): rosbag.Bag(f, 'r', chunk_threshold=-1000)
for fn in [fn1, fn2, fn3, fn4, fn5]:
self.failUnlessRaises(ValueError, fn)
self.assertRaises(ValueError, fn)

def test_io_on_close_fails(self):
def fn():
b = rosbag.Bag('/tmp/test_io_close_fails.bag', 'w')
b.close()
size = b.size()
self.failUnlessRaises(ValueError, fn)
self.assertRaises(ValueError, fn)

def test_write_invalid_message_fails(self):
def fn():
with rosbag.Bag('/tmp/test_write_invalid_message_fails.bag', 'w') as b:
b.write(None, None, None)
self.failUnlessRaises(ValueError, fn)
self.assertRaises(ValueError, fn)

def test_simple_write_uncompressed_works(self):
with rosbag.Bag('/tmp/test_simple_write_uncompressed_works.bag', 'w') as b:
Expand All @@ -104,10 +104,10 @@ def test_simple_write_uncompressed_works(self):

msgs = list(rosbag.Bag('/tmp/test_simple_write_uncompressed_works.bag').read_messages())

self.assert_(len(msgs) == msg_count, 'not all messages written: expected %d, got %d' % (msg_count, len(msgs)))
self.assertTrue(len(msgs) == msg_count, 'not all messages written: expected %d, got %d' % (msg_count, len(msgs)))

for (_, _, t1), (_, _, t2) in zip(msgs, msgs[1:]):
self.assert_(t1 < t2, 'messages returned unordered: got timestamp %s before %s' % (str(t1), str(t2)))
self.assertTrue(t1 < t2, 'messages returned unordered: got timestamp %s before %s' % (str(t1), str(t2)))

def test_writing_nonchronological_works(self):
with rosbag.Bag('/tmp/test_writing_nonchronological_works.bag', 'w') as b:
Expand All @@ -121,10 +121,10 @@ def test_writing_nonchronological_works(self):

msgs = list(rosbag.Bag('/tmp/test_writing_nonchronological_works.bag').read_messages())

self.assert_(len(msgs) == msg_count, 'not all messages written: expected %d, got %d' % (msg_count, len(msgs)))
self.assertTrue(len(msgs) == msg_count, 'not all messages written: expected %d, got %d' % (msg_count, len(msgs)))

for (_, _, t1), (_, _, t2) in zip(msgs, msgs[1:]):
self.assert_(t1 < t2, 'messages returned unordered: got timestamp %s before %s' % (str(t1), str(t2)))
self.assertTrue(t1 < t2, 'messages returned unordered: got timestamp %s before %s' % (str(t1), str(t2)))

def test_large_write_works(self):
for compression in [rosbag.Compression.NONE, rosbag.Compression.BZ2]:
Expand All @@ -139,10 +139,10 @@ def test_large_write_works(self):

msgs = list(rosbag.Bag('/tmp/test_large_write_works.bag').read_messages())

self.assertEquals(len(msgs), msg_count, 'not all messages written: expected %d, got %d' % (msg_count, len(msgs)))
self.assertEqual(len(msgs), msg_count, 'not all messages written: expected %d, got %d' % (msg_count, len(msgs)))

for (_, _, t1), (_, _, t2) in zip(msgs, msgs[1:]):
self.assert_(t1 < t2, 'messages returned unordered: got timestamp %s before %s' % (str(t1), str(t2)))
self.assertTrue(t1 < t2, 'messages returned unordered: got timestamp %s before %s' % (str(t1), str(t2)))

def test_get_messages_time_range_works(self):
with rosbag.Bag('/tmp/test_get_messages_time_range_works.bag', 'w') as b:
Expand All @@ -156,7 +156,7 @@ def test_get_messages_time_range_works(self):
end_time = genpy.Time.from_sec(7)
msgs = list(rosbag.Bag('/tmp/test_get_messages_time_range_works.bag').read_messages(topics='/ints', start_time=start_time, end_time=end_time))

self.assertEquals(len(msgs), 5)
self.assertEqual(len(msgs), 5)

def test_get_messages_filter_works(self):
with rosbag.Bag('/tmp/test_get_messages_filter_works.bag', 'w') as b:
Expand All @@ -169,7 +169,7 @@ def test_get_messages_filter_works(self):
def filter(topic, datatype, md5sum, msg_def, header):
return '5' in topic and datatype == Int32._type and md5sum == Int32._md5sum and msg_def == Int32._full_text

self.assertEquals(len(list(rosbag.Bag('/tmp/test_get_messages_filter_works.bag').read_messages(connection_filter=filter))), 3)
self.assertEqual(len(list(rosbag.Bag('/tmp/test_get_messages_filter_works.bag').read_messages(connection_filter=filter))), 3)

def test_rosbag_filter(self):
inbag_filename = '/tmp/test_rosbag_filter__1.bag'
Expand All @@ -188,7 +188,7 @@ def test_rosbag_filter(self):

msgs = list(rosbag.Bag(outbag_filename).read_messages())

self.assertEquals(len(msgs), 5)
self.assertEqual(len(msgs), 5)

# Regression test for issue #736
def test_trivial_rosbag_filter(self):
Expand All @@ -213,7 +213,7 @@ def test_trivial_rosbag_filter(self):
outbag1_md5 = hashlib.md5(h.read()).hexdigest()
with open(outbag2_filename, 'rb') as h:
outbag2_md5 = hashlib.md5(h.read()).hexdigest()
self.assertEquals(outbag1_md5, outbag2_md5)
self.assertEqual(outbag1_md5, outbag2_md5)
finally:
shutil.rmtree(tempdir)

Expand Down Expand Up @@ -344,10 +344,10 @@ def test_get_message_count(self):
bag.write("/test_bag/more", String(data='alone'))

with rosbag.Bag(fn) as bag:
self.assertEquals(bag.get_message_count(), 300)
self.assertEquals(bag.get_message_count(topic_filters='/test_bag'), 200)
self.assertEquals(bag.get_message_count(topic_filters=['/test_bag', '/test_bag/more']), 300)
self.assertEquals(bag.get_message_count(topic_filters=['/none']), 0)
self.assertEqual(bag.get_message_count(), 300)
self.assertEqual(bag.get_message_count(topic_filters='/test_bag'), 200)
self.assertEqual(bag.get_message_count(topic_filters=['/test_bag', '/test_bag/more']), 300)
self.assertEqual(bag.get_message_count(topic_filters=['/none']), 0)

def test_get_compression_info(self):
fn = '/tmp/test_get_compression_info.bag'
Expand All @@ -359,19 +359,19 @@ def test_get_compression_info(self):

with rosbag.Bag(fn) as bag:
info = bag.get_compression_info()
self.assertEquals(info.compression, rosbag.Compression.NONE)
self.assertEqual(info.compression, rosbag.Compression.NONE)
# 167 Bytes of overhead, 50 Bytes per Int32.
self.assertEquals(info.uncompressed, 5166)
self.assertEquals(info.compressed, 5166)
self.assertEqual(info.uncompressed, 5166)
self.assertEqual(info.compressed, 5166)

with rosbag.Bag(fn, mode='w', compression=rosbag.Compression.BZ2) as bag:
for i in range(100):
bag.write("/test_bag", Int32(data=i))

with rosbag.Bag(fn) as bag:
info = bag.get_compression_info()
self.assertEquals(info.compression, rosbag.Compression.BZ2)
self.assertEquals(info.uncompressed, 5166)
self.assertEqual(info.compression, rosbag.Compression.BZ2)
self.assertEqual(info.uncompressed, 5166)

# the value varies each run, I suspect based on rand, but seems
# to generally be around 960 to 980 on my comp
Expand All @@ -389,18 +389,18 @@ def test_get_time(self):
start_stamp = bag.get_start_time()
end_stamp = bag.get_end_time()

self.assertEquals(start_stamp, 0.0)
self.assertEquals(end_stamp, 99.0)
self.assertEqual(start_stamp, 0.0)
self.assertEqual(end_stamp, 99.0)

def test_get_time_empty_bag(self):
"""Test for issue #657"""
fn = '/tmp/test_get_time_empty_bag.bag'

with rosbag.Bag(fn, mode='w') as bag:
self.assertRaisesRegexp(rosbag.ROSBagException,
self.assertRaisesRegex(rosbag.ROSBagException,
'Bag contains no message',
bag.get_start_time)
self.assertRaisesRegexp(rosbag.ROSBagException,
self.assertRaisesRegex(rosbag.ROSBagException,
'Bag contains no message',
bag.get_end_time)

Expand All @@ -416,56 +416,56 @@ def test_get_type_and_topic_info(self):

with rosbag.Bag(fn) as bag:
msg_types, topics = bag.get_type_and_topic_info()
self.assertEquals(len(msg_types), 2)
self.assertEqual(len(msg_types), 2)
self.assertTrue("std_msgs/Int32" in msg_types)
self.assertTrue("std_msgs/String" in msg_types)
self.assertEquals(len(topics), 2)
self.assertEqual(len(topics), 2)
self.assertTrue(topic_1 in topics)
self.assertTrue(topic_2 in topics)

self.assertEquals(topics[topic_1].message_count, 200)
self.assertEquals(topics[topic_1].msg_type, "std_msgs/Int32")
self.assertEquals(topics[topic_2].message_count, 100)
self.assertEquals(topics[topic_2].msg_type, "std_msgs/String")
self.assertEqual(topics[topic_1].message_count, 200)
self.assertEqual(topics[topic_1].msg_type, "std_msgs/Int32")
self.assertEqual(topics[topic_2].message_count, 100)
self.assertEqual(topics[topic_2].msg_type, "std_msgs/String")

#filter on topic 1
msg_types, topics = bag.get_type_and_topic_info(topic_1)

# msg_types should be unaffected by the filter
self.assertEquals(len(msg_types), 2)
self.assertEqual(len(msg_types), 2)
self.assertTrue("std_msgs/Int32" in msg_types)
self.assertTrue("std_msgs/String" in msg_types)

self.assertEquals(len(topics), 1)
self.assertEqual(len(topics), 1)
self.assertTrue(topic_1 in topics)

self.assertEquals(topics[topic_1].message_count, 200)
self.assertEquals(topics[topic_1].msg_type, "std_msgs/Int32")
self.assertEqual(topics[topic_1].message_count, 200)
self.assertEqual(topics[topic_1].msg_type, "std_msgs/Int32")

#filter on topic 2
msg_types, topics = bag.get_type_and_topic_info(topic_2)

# msg_types should be unaffected by the filter
self.assertEquals(len(msg_types), 2)
self.assertEqual(len(msg_types), 2)
self.assertTrue("std_msgs/Int32" in msg_types)
self.assertTrue("std_msgs/String" in msg_types)

self.assertEquals(len(topics), 1)
self.assertEqual(len(topics), 1)
self.assertTrue(topic_2 in topics)

self.assertEquals(topics[topic_2].message_count, 100)
self.assertEquals(topics[topic_2].msg_type, "std_msgs/String")
self.assertEqual(topics[topic_2].message_count, 100)
self.assertEqual(topics[topic_2].msg_type, "std_msgs/String")

#filter on missing topic
msg_types, topics = bag.get_type_and_topic_info("/none")

# msg_types should be unaffected by the filter
self.assertEquals(len(msg_types), 2)
self.assertEqual(len(msg_types), 2)
self.assertTrue("std_msgs/Int32" in msg_types)
self.assertTrue("std_msgs/String" in msg_types)

# topics should be empty
self.assertEquals(len(topics), 0)
self.assertEqual(len(topics), 0)

def _print_bag_records(self, fn):
with open(fn) as f:
Expand Down
26 changes: 13 additions & 13 deletions test/test_rosgraph/test/test_rosgraph_masterapi_online.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,15 @@ def test_getPid(self):

def test_getUri(self):
val = self.m.getUri()
self.assert_(val.startswith('http://'))
self.assertTrue(val.startswith('http://'))

def test_lookupService(self):
uri = 'http://localhost:897'
rpcuri = 'rosrpc://localhost:9812'
self.m.registerService('/bar/service', rpcuri, uri)
self.assertEquals(rpcuri, self.m.lookupService('/bar/service'))
self.assertEqual(rpcuri, self.m.lookupService('/bar/service'))
try:
self.assertEquals(uri, self.m.lookupService('/fake/service'))
self.assertEqual(uri, self.m.lookupService('/fake/service'))
self.fail("should have thrown")
except rosgraph.masterapi.Error:
pass
Expand All @@ -70,16 +70,16 @@ def test_registerService(self):
def test_unregisterService(self):
self.m.registerService('/unreg_service/service', 'rosrpc://localhost:9812', 'http://localhost:893')
val = self.m.registerService('/unreg_service/service', 'rosrpc://localhost:9812', 'http://localhost:893')
self.assertEquals(1, val)
self.assertEqual(1, val)

def test_registerSubscriber(self):
val = self.m.registerSubscriber('/reg_sub/node', 'std_msgs/String', 'http://localhost:9812')
self.assertEquals([], val)
self.assertEqual([], val)

def test_unregisterSubscriber(self):
self.m.registerSubscriber('/reg_unsub/node', 'std_msgs/String', 'http://localhost:9812')
val = self.m.unregisterSubscriber('/reg_unsub/node', 'http://localhost:9812')
self.assertEquals(1, val)
self.assertEqual(1, val)

def test_registerPublisher(self):
val = self.m.registerPublisher('/reg_pub/topic', 'std_msgs/String', 'http://localhost:9812')
Expand All @@ -93,7 +93,7 @@ def test_lookupNode(self):
# register and lookup self
uri = 'http://localhost:12345'
self.m.registerPublisher('fake_topic', 'std_msgs/String', uri)
self.assertEquals(uri, self.m.lookupNode(_ID))
self.assertEqual(uri, self.m.lookupNode(_ID))

try:
self.m.lookupNode('/non/existent')
Expand All @@ -111,8 +111,8 @@ def test_getSystemState(self):
pub, sub, srvs = self.m.getSystemState()

def test_is_online(self):
self.assert_(rosgraph.masterapi.is_online())
self.assert_(self.m.is_online())
self.assertTrue(rosgraph.masterapi.is_online())
self.assertTrue(self.m.is_online())

def test_getParam(self):
try:
Expand All @@ -122,17 +122,17 @@ def test_getParam(self):
pass

def test_hasParam(self):
self.failIf(self.m.hasParam('fake_param'), "should have failed to lookup fake parameter")
self.assert_(self.m.hasParam('/run_id'), "should have failed to lookup fake parameter")
self.assertFalse(self.m.hasParam('fake_param'), "should have failed to lookup fake parameter")
self.assertTrue(self.m.hasParam('/run_id'), "should have failed to lookup fake parameter")

def test_setParam(self):
self.m.setParam('/foo', 1)

def test_searchParam(self):
self.assertEquals("/run_id", self.m.searchParam('run_id'))
self.assertEqual("/run_id", self.m.searchParam('run_id'))

def test_getParamNames(self):
self.assert_(type(self.m.getParamNames()) == list)
self.assertTrue(type(self.m.getParamNames()) == list)

if __name__ == '__main__':
rostest.rosrun('test_rosgrap', 'test_rosgraph_masterapi_online', MasterApiOnlineTest)
8 changes: 4 additions & 4 deletions test/test_roslaunch/test/env.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,14 @@
class TestEnv(unittest.TestCase):
def test_env(self):
if '--noenv' in sys.argv:
self.assertEquals(None, os.environ.get('TEST_ENV', None))
self.assertEquals(None, os.environ.get('TEST_ENV_SUBSTITUTION', None))
self.assertEqual(None, os.environ.get('TEST_ENV', None))
self.assertEqual(None, os.environ.get('TEST_ENV_SUBSTITUTION', None))
else:
self.assertEquals('test env', os.environ.get('TEST_ENV', None))
self.assertEqual('test env', os.environ.get('TEST_ENV', None))
rospack = rospkg.RosPack()
path1 = os.path.join(rospack.get_path('roslaunch'), 'src')
path2 = os.environ.get('TEST_ENV_SUBSTITUTION', None)
self.assertEquals(os.path.abspath(path1), os.path.abspath(path2))
self.assertEqual(os.path.abspath(path1), os.path.abspath(path2))

if __name__ == '__main__':
rostest.rosrun(PKG, NAME, TestEnv)
Expand Down
Loading

0 comments on commit d5a1b6d

Please sign in to comment.