class NaiveReactor:
def run(self):
while self._run:
for t in self.transports:
t.read()
sleep(0.001)
class NaiveTransport:
def read(self):
try:
xs = read(self.fd, 512):
except IOError:
if e.errno != 11:
self.protocol.connectionLost()
else:
self.protocol.dataReceived(xs)
class IRCClient(basic.LineReceiver):
def userJoined(self, user, channel):
"""Called when I see another user joining a channel.
"""
pass
def topicUpdated(self, user, channel, newTopic):
"""In channel, user changed the topic to newTopic.
Also called when first joining a channel.
"""
pass
class YourIRCBot(IRCClient):
def userJoined(self, user, channel):
self.transport.write("Hello " + user)
class BotFactory(t.i.p.ReconnectingClientFactory):
protol = YourIRCBot
reactor.connectTCP("irc.freenode.net", 6667, BotFactory())
reactor.run()
class Protocol:
def dataReceived(self, data):
pass
def makeConnection(self, transport):
self.transport = transport
def test_irc_hello():
p = YourIRCBot()
p.makeConnection(StringIO())
p.lineReceived(":bob JOIN #foo")
ok_(p.transport.getvalue(), "PRIVMSG #foo Hello bob")
def errback(reason):
print "Oh noes: {0}".format(reason)
sys.exit(1)
d = twisted.web.client.Agent().request('GET', url, Headers())
# Deferred returned instad of result
d.addCallback(lambda response: r.deliverBody(YourProtocol()))
# callback to execute once result arrives
d.addErrback(errback)
# or errback in case of failure
d.addBoth(lambda _: reactor.stop())
# and another callback regardless of success / fail
reactor.run()
twisted.enterprise.adbapi
asynchronous DB-API v2.0 (raw SQL)reactor
in your test cases, errors can
manifest in later testcases or hang forever/
#