Changeset 10
- Timestamp:
- 01/15/09 13:56:19 (20 months ago)
- Hashname:
- 20090115125619-b2873-fe426e55ab44d12f778abe0d8e0e4fd2a991dfd0
- Location:
- sipchatserver
- Files:
-
- 2 modified
-
chatroom.py (modified) (5 diffs)
-
server.py (modified) (2 diffs)
Legend:
- Unmodified
- Added
- Removed
-
sipchatserver/chatroom.py
r2 r10 49 49 class ChatRoom: 50 50 51 def __init__(self, traffic_logger): 51 def __init__(self, room_id, reception, traffic_logger): 52 self.room_id = room_id 53 self.reception = reception 52 54 self.traffic_logger = traffic_logger 53 55 self.sessions = [] 54 56 self.accept_incoming_job = None 55 57 self.incoming_queue = queue() 56 self.message_dispatcher_job = proc.spawn_link (self._message_dispatcher)58 self.message_dispatcher_job = proc.spawn_link_exception(self._message_dispatcher) 57 59 58 60 def _message_dispatcher(self): 59 61 """Read from self.incoming_queue and dispatch the messages to other participants""" 60 while True: 61 session, message = self.incoming_queue.wait() 62 self._dispatch_message(session, message) 62 try: 63 while True: 64 session, message = self.incoming_queue.wait() 65 self._dispatch_message(session, message) 66 except proc.ProcExit, ex: 67 return ex 63 68 64 69 def _dispatch_message(self, session, message): … … 71 76 self.remove_session(session) 72 77 except: 78 self.remove_session(session) 73 79 import traceback 74 80 traceback.print_exc() … … 81 87 self.sessions = [] 82 88 83 def start_accept_incoming(self, *args, **kwargs): 84 assert not self.accept_incoming_job, self.accept_incoming_job 85 self.accept_incoming_job = proc.spawn(self._accept_incoming, *args, **kwargs) 86 87 def stop_accept_incoming(self): 88 if self.accept_incoming_job: 89 self.accept_incoming_job.kill() 90 self.accept_incoming_job = None 91 92 def add_session(self, session, msrp): 93 proc.spawn(self._forwarder, msrp, session) 89 def add_session(self, session): 90 proc.spawn(self._forwarder, session) 94 91 self.sessions.append(session) 95 92 96 93 def remove_session(self, session): 97 94 try: 98 #session.end()95 session.end() 99 96 self.sessions.remove(session) 100 97 except ValueError: 101 98 pass 99 if not self.sessions: 100 self.reception.remove_room(self) 102 101 103 def _forwarder(self, msrp,session):102 def _forwarder(self, session): 104 103 while True: 105 104 try: 106 chunk = msrp.receive_chunk()105 chunk = session.msrp.receive_chunk() 107 106 except ConnectionClosed: 108 107 self.remove_session(session) … … 114 113 def new_session(sip, msrp): 115 114 session = MSRPSession(sip, msrp) 116 self.add_session(session , msrp)115 self.add_session(session) 117 116 acceptor = MSRPAcceptFactory.new(relay, self.traffic_logger) 118 117 handler1 = JoinHandler(acceptor, new_session) … … 125 124 print ex 126 125 sleep(1) 126 127 128 class Reception: 129 130 def __init__(self, traffic_logger): 131 self.traffic_logger = traffic_logger 132 self.rooms = {} # maps room_id -> ChatRoom 133 134 def add_session(self, session): 135 room_id = '%s@%s' % (session.local_uri.user, session.local_uri.host) 136 room = self.rooms.get(room_id) 137 if room is None: 138 print 'Creating a room %s' % room_id 139 room = ChatRoom(room_id, self, self.traffic_logger) 140 self.rooms[room_id] = room 141 room.add_session(session) 142 143 def remove_room(self, room): 144 self.rooms.pop(room.room_id, None) 145 146 def accept_incoming_loop(self, e, relay=None, local_uri=None): 147 def new_session(sip, msrp): 148 session = MSRPSession(sip, msrp) 149 self.add_session(session) 150 acceptor = MSRPAcceptFactory.new(relay, self.traffic_logger) 151 handler1 = JoinHandler(acceptor, new_session) 152 handler = IncomingSessionHandler() 153 handler.add_handler(handler1) 154 while True: 155 try: 156 handler.wait_and_handle(e, local_uri) 157 except MSRPSessionErrors, ex: 158 print ex 159 sleep(1) 160 except proc.ProcExit: 161 return 162 -
sipchatserver/server.py
r3 r10 10 10 11 11 import sipchatserver 12 from sipchatserver.chatroom import ChatRoom12 from sipchatserver.chatroom import Reception 13 13 from sipchatserver.tls import Certificate, PrivateKey 14 14 … … 59 59 reactor.addSystemEventTrigger('before', 'shutdown', shutdown) 60 60 logger = TrafficLogger.to_file(is_enabled_func = lambda: options.trace_msrp) 61 r oom = ChatRoom(logger)62 room.start_accept_incoming(e, options.relay, server_uri)61 reception = Reception(logger) 62 proc.spawn_link_exception(reception.accept_incoming_loop, e, options.relay, server_uri) 63 63 64 64 def run_reactor(self):
