
We’ve all been there—one minute you’re enjoying your coffee and planning your day, savoring a rare moment of calm, and then BAM! Suddenly, alerts start flooding in, your email goes into overdrive, and your peaceful day is completely upended. So much for getting caught up! Today, we’re diving into the world of Zoom Websockets to build a “Priority Incident Hotline.” This meeting is designed for our fictional IT team to handle high-priority incidents. When the incident manager—or any authorized user—joins, our system will automatically call in all the essential troubleshooting staff and send out an email with the meeting link. Let’s jump right in!
Getting Started
Before we begin, you’ll need a Zoom account with the privileges to create apps in the Zoom Marketplace. For this example, I’m using my free Zoom account.
Step 1: Create Your Meeting
Your meeting will be a recurring one with no fixed time, so it’s always available when needed. Follow these steps:
- Log in to the Zoom website.
- Click “Meetings” on the left-hand menu.
- Click “Schedule a Meeting.”
- Give your meeting a topic.
- Check the “Recurring Meeting” box and select “No Fixed Time” from the drop-down menu.
- Click “Save.”
Step 2: Create Your App
Now that you have your meeting set up, it’s time to create the app that will handle our WebSocket connection.
- Navigate to the App Marketplace:
- Under the Admin menu, select Advanced and then App Marketplace.
- Build a New App:
- Click the Develop dropdown in the top-right corner and select Build App.
- Choose Server-to-Server OAuth App and click Create.

4. Name Your App:
Give your app a memorable name and click Create.

5. Save Your Credentials:
Once your app is created, make a note of the credentials. You’ll need these later for API calls and establishing the Websocket connection.

6. Click Continue to complete setting up your application
7. Complete the Basic Information section and click Continue.
8. Configure Websocket Connection:
In the Features section, note your Secret Token and Verification Token—you’ll need these for your future reference.

9. Enable Event Subscription:
Turn on the Event Subscription Slider and select Websocket.

10. Add Your Events:
Choose the events you want to subscribe to. Although Zoom offers many events, we’re focusing on meeting events—especially the “start meeting” event. For simplicity, you might select all meeting events, but for production, it’s best to subscribe only to what you need.

11. Record the Websocket URL:
Click Done when you’re finished selecting your events. This action will generate a Websocket URL (wss://) that you’ll use later in your code. Make a note of this URL.

12. Configure Scopes:
Scopes are the permissions your app token needs to perform specific tasks. Ensure you select the scopes that correspond with the events you’ve chosen.

13. Activate Your App:
Finally, click Activate Your App to make it live. Congratulations—you’re now ready to dive into the coding part!

The Code
For this project, we’re harnessing the power of Python. We’ll use the requests
package to interact with the Zoom API, the websocket
package to manage our WebSocket connections, and smtplib
for email notifications. To ensure everything runs smoothly, we’ll also leverage APScheduler
to manage Zoom access tokens and keep our WebSocket connection alive.
Step 1: Prepare the Environment
In this example, I’m using Debian, but feel free to use whichever system you’re most comfortable with.
- Create a Dedicated User:
- Log in as a user with sudo privileges and run:
sudo adduser zoomwebsocket
- Follow the prompts to set and confirm the password, and accept the defaults.
- Log in as a user with sudo privileges and run:
- Add the User to the Sudo Group:
- Execute:
sudo usermod -aG sudo zoomwebsocket
- Execute:
- Install Required Software:
- Update your package lists:
sudo apt update
- Install Python 3, pip, and virtualenv (if they’re not already installed):
sudo apt install python3 sudo apt install pip sudo apt install virtualenv
- Update your package lists:
- Set Up a Virtual Environment:
- Login as your new user.
- Create a virtual environment in your home directory:
virtualenv ZoomWebSocket
- Change into the new directory:
cd ZoomWebSocket
- Activate the virtual environment:
source bin/activate
- You should now see your prompt change to indicate that you’re inside the virtual environment.
- Install Project Dependencies:
- Install the required packages using
requirements.txt
: pip install -r requirements.txt
- Install the required packages using
With our environment set up and dependencies installed, we’re ready to dive into the code. In this section, we’ll build the core of our Zoom WebSocket Priority Incident Hotline.
- The Imports
We’ll start by importing the necessary packages. These imports allow us to work with the Zoom API, handle WebSocket connections, send emails, and schedule recurring tasks.
import os
import sys
import json
import datetime
import signal
import logging
from logging.handlers import RotatingFileHandler
import smtplib
from email.message import EmailMessage
import requests
from requests.auth import HTTPBasicAuth
import websocket
from apscheduler.schedulers.background import BackgroundScheduler
from dotenv import load_dotenv
We will also need to load_dotenv()
to read our environmental variables.
2. The Class
Next, we define our ZoomWebSocket
class. This class encapsulates our configuration, logging, API session management, and scheduling functionality.
class ZoomWebSocket:
ACCOUNT_ID: str = os.getenv("ACCOUNT_ID")
CLIENT_ID: str = os.getenv("CLIENT_ID")
CLIENT_SECRET: str = os.getenv("CLIENT_SECRET")
meeting_id: str = "your meeting id"
# Define a list of (user, phone_number) tuples, IE numbers we want to call when the meeting starts
participants = [
("IT Help Desk", "15554560001")
# ("Network Oncall Engineer", "15554561000"),
# ("Server Oncall Enginner", "15554562000")
]
def __init__(self) -> None:
"""Initialize the ZoomWebSocket instance, set up logging, session, and scheduler."""
self.start_time: datetime.datetime = datetime.datetime.now()
# Set up rotating log handler
log_file = "ZoomWebSocketLog.txt"
log_handler = RotatingFileHandler(log_file, maxBytes=10 * 1024 * 1024, backupCount=3)
log_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
self.logger = logging.getLogger("ZoomWebSocket")
self.logger.setLevel(logging.INFO)
self.logger.addHandler(log_handler)
self.logger.info(f"ZoomWebSocket started at {self.start_time}")
# Create a persistent requests session
self.session = requests.Session()
# Get the access token
self.ACCESS_TOKEN = self.get_access_token()
self.logger.info("Access token retrieved.")
# Set up the scheduler for heartbeat and token refresh
self.scheduler = BackgroundScheduler()
self.scheduler.add_job(self.send_heartbeat, 'interval', seconds=25)
self.scheduler.add_job(self.refresh_token, 'interval', minutes=55)
self.scheduler.start()
3. Class Methods
get_access_token():
To interact with the Zoom API, we need an access token that will expire every hour. The following method retrieves this token and sets the expire time
def get_access_token(self) -> str:
"""
Retrieve the access token from Zoom.
Returns:
str: The access token if successful, otherwise an empty string.
"""
url = "https://zoom.us/oauth/token"
data = {
'grant_type': 'account_credentials',
'account_id': self.ACCOUNT_ID
}
try:
response = self.session.post(
url,
auth=HTTPBasicAuth(self.CLIENT_ID, self.CLIENT_SECRET),
data=data,
timeout=10
)
response.raise_for_status()
json_response = response.json()
self.access_token_expires = datetime.datetime.now() + datetime.timedelta(hours=1)
self.logger.info(f"Access Token Expires {self.access_token_expires}")
return json_response.get("access_token", "")
except requests.exceptions.RequestException as e:
error_msg = f"Error fetching Access Token: {str(e)}"
print(error_msg)
self.logger.error(error_msg)
return ""
refresh_token():
Since the access token expires every 60 minutes we’ll need a method to refresh the access token. The BackgroundScheduler()
is setup during the class initialization to call this function every 55 minutes.
def refresh_token(self) -> None:
"""Refresh the access token using the get_access_token method."""
self.logger.info("Refreshing access token...")
new_token = self.get_access_token()
if new_token:
self.ACCESS_TOKEN = new_token
self.logger.info("New access token acquired.")
else:
self.logger.error("Failed to refresh access token.")
send_heartbeat():
To keep the WebSocket connection alive we need to send a heartbeat message every 30 seconds. The BackgroundScheduler()
is setup during the class initialization to call this function every 25 seconds. Just a little sooner then 30 seconds to allow for any potential hang-ups along the network path.
def send_heartbeat(self) -> None:
"""Send a heartbeat message through the WebSocket to Zoom."""
try:
if hasattr(self, 'ws'):
self.ws.send('{ "module": "heartbeat" }')
self.logger.info("Heartbeat sent to Zoom")
else:
self.logger.warning("WebSocket not connected; heartbeat not sent.")
except Exception as e:
self.logger.error("Error sending heartbeat: " + str(e))
process_message()
The process_message()
method is triggered every time the WebSocket receives a message. This is where we determine the type of the incoming message and execute the appropriate action. For example, if the message type is meeting.started
and it corresponds to our specific meeting ID, our system will automatically initiate a call-out to notify the relevant team. You can also add support for other events—like meeting.ended
or meeting.participant_left
—by implementing dedicated helper methods for each event type. This flexible approach allows you to tailor the response actions to suit your needs.
def process_message(self, message: str) -> None:
"""
Process a message received via WebSocket.
Args:
message (str): The raw JSON string received.
"""
try:
j = json.loads(message)
self.logger.info(f"{j.get('module')} message received")
print(json.dumps(j, indent=4))
self.logger.info(json.dumps(j, indent=4))
except Exception as e:
self.logger.error("Error processing message JSON: " + str(e))
return
if j.get('module') == 'message':
try:
content_string = j['content']
content = json.loads(content_string)
except Exception as e:
self.logger.error("Error processing content JSON: " + str(e))
return
try:
event = content.get('event')
if event == "meeting.participant_left":
self.process_leave_event(content)
elif event == "meeting.ended":
self.process_meeting_end_event(content)
elif event == "meeting.participant_joined":
self.process_participant_joined_event(content)
elif event == "meeting.started":
self.process_meeting_started_event(content)
except Exception as e:
self.logger.error("Error processing event: " + str(e))
elif j.get('module') == 'heartbeat':
self.logger.info(f"Received heartbeat response. Success = {str(j.get('success'))}")
print(f"{datetime.datetime.now()} - Received heartbeat response. Success = {str(j.get('success'))}")
process_leave_event()
The process_leave_event()
method is triggered whenever we receive a meeting.participant_left
message from Zoom. I’ve observed that sometimes, even after all participants leave a meeting, Zoom keeps the meeting active for an additional five minutes with no one in attendance. While this behavior isn’t problematic in itself, it poses a challenge for our application: if someone rejoins during that window, the meeting.started
event won’t fire, and our call-out mechanism won’t be activated.
To address this, I implemented logic within process_leave_event()
that checks the meeting’s participant count whenever a user leaves. If the count reaches zero, the method proactively terminates the meeting using the Zoom API. This ensures that if a new incident occurs shortly afterward, the meeting.started
event will trigger again, and the necessary call-outs will be placed
def process_leave_event(self, content: dict) -> None:
"""
Process an event when a participant leaves a meeting.
Args:
content (dict): The event content dictionary.
"""
try:
meeting = content['payload']['object']['id']
user = content['payload']['object']['participant']['user_name']
topic = content['payload']['object']['topic']
if meeting == self.meeting_id:
print(f"{user} has left the meeting {topic}!")
self.logger.info(f"{user} has left the meeting {topic}!")
details = self.get_meeting_details(self.meeting_id)
participant_count = details.get("participants", 0)
if participant_count == 0:
self.logger.info("There are zero participants in the meeting. Ending call.")
self.end_meeting(self.meeting_id)
except Exception as e:
self.logger.error("Error in process_leave_event: " + str(e))
process_meeting_end_event()
The process_meeting_end_event()
method is invoked whenever we receive a meeting.ended
message from Zoom. In this method, we simply log and print a message indicating that the meeting has ended. This helps keep our system informed of the meeting status and can be expanded later if further actions are required when a meeting concludes
def process_meeting_end_event(self, content: dict) -> None:
"""
Process an event when a meeting ends.
Args:
content (dict): The event content dictionary.
"""
try:
meeting = content['payload']['object']['id']
print(f"{meeting} has ended!")
self.logger.info(f"{meeting} has ended!")
except Exception as e:
self.logger.error("Error in process_meeting_end_event: " + str(e))
process_participant_joined_event()
The process_participant_joined_event()
method is called when we receive a meeting.participant_joined
message from Zoom. In its current form, this method simply prints and logs a message to indicate that a user has joined the meeting. This basic implementation lays the groundwork for future enhancements. For instance, if a help desk agent answers the call-out and joins the meeting, we could trigger additional notifications to keep the rest of the team informed.
def process_participant_joined_event(self, content: dict) -> None:
"""
Process an event when a participant joins a meeting.
Args:
content (dict): The event content dictionary.
"""
try:
user = content['payload']['object']['participant']['user_name']
topic = content['payload']['object']['topic']
print(f"{user} has joined the meeting {topic}!")
self.logger.info(f"{user} has joined the meeting {topic}!")
except Exception as e:
self.logger.error("Error in process_participant_joined_event: " + str(e))
process_meeting_started_event()
Finally! This is the bread and butter of our application. The process_meeting_started_event()
method is invoked when we receive a meeting.started
event from Zoom. Within this method, we first verify that the meeting ID in the event matches the meeting ID of our “Priority Incident Hotline.” If it does, we call the place_call
function, which leverages the Zoom API to automatically dial out to the users listed in our participants list. This ensures that the right team members are alerted immediately when an incident meeting is initiated. Here we can also use the send_email
method to send the users the url to the meeting.
def process_meeting_started_event(self, content: dict) -> None:
"""
Process an event when a meeting starts.
Args:
content (dict): The event content dictionary.
"""
try:
meeting = content['payload']['object']['id']
print(f"Meeting ID: {meeting} has started!")
self.logger.info(f"Meeting ID: {meeting} has started!")
if meeting == self.meeting_id:
print("Meeting has started. Placing calls to participants!")
self.logger.info("Meeting has started. Placing calls to participants!")
for user, phone in self.participants:
self.place_call(meeting, user, phone)
#Send email with invite link uncomment to use
'''email_msg="Priority Incident Meeting has started please join the meeting\n https://zoom.us/your meeting url"
self.send_mail(to_email=['email1@example.com', 'email2@example.com'],
subject="Priority Incident Meeting has started!", message=email_msg)'''
except Exception as e:
self.logger.error("Error in process_meeting_started_event: " + str(e))
place_call()
The place_call
method utilizes the Zoom API to instruct Zoom to place a call out to the specified phone number. Be sure to include your current access token in the API call, as this token is required for authentication with Zoom’s API.
def place_call(self, meeting_ID: str, name: str, phone_number: str) -> None:
"""
Place a call to a participant to join the meeting.
Args:
meeting_ID (str): The meeting ID.
name (str): The invitee's name.
phone_number (str): The invitee's phone number.
"""
meeting_options = {
"method": "participant.invite.callout",
"params": {
"invitee_name": str(name),
"phone_number": str(phone_number),
"invite_options": {
"require_greeting": False,
"require_pressing_one": False
},
}
}
headers = {
'authorization': 'Bearer ' + self.ACCESS_TOKEN,
'content-type': 'application/json'
}
url = f'https://api.zoom.us/v2/live_meetings/{meeting_ID}/events'
try:
self.logger.info(f"Placing call to {phone_number} to join {name} for meeting {meeting_ID}")
response = self.session.patch(url, headers=headers, data=json.dumps(meeting_options))
print(response.text)
self.logger.info(f"Place call request returned {str(response)}")
except Exception as e:
self.logger.error("Error placing call: " + str(e))
get_meeting_details()
The get_meeting_details()
helper method retrieves the details of the specified meeting using the Zoom API. Although this API call returns a wealth of information, in our case, we primarily use it to determine the current participant count.
def get_meeting_details(self, meeting_id: str) -> dict:
"""
Retrieve meeting details.
Args:
meeting_id (str): The meeting ID.
Returns:
dict: The meeting details or an empty dictionary if an error occurs.
"""
headers = {
'authorization': 'Bearer ' + self.ACCESS_TOKEN,
'content-type': 'application/json'
}
try:
response = self.session.get(f'https://api.zoom.us/v2/metrics/meetings/{meeting_id}', headers=headers)
if response.status_code == 200:
self.logger.info(f"Successfully retrieved meeting details for meeting {meeting_id}")
else:
self.logger.error("Error retrieving meeting details: " + response.text)
return response.json()
except Exception as e:
self.logger.error("Error in get_meeting_details: " + str(e))
return {}
end_meeting()
The end_meeting
method uses the Zoom API to end the meeting specified.
def end_meeting(self, meeting_id: str) -> int:
"""
End a meeting.
Args:
meeting_id (str): The meeting ID.
Returns:
int: The HTTP status code from the request.
"""
action = {"action": "end"}
headers = {
'authorization': 'Bearer ' + self.ACCESS_TOKEN,
'content-type': 'application/json'
}
try:
response = self.session.put(f'https://api.zoom.us/v2/meetings/{meeting_id}/status',
headers=headers, data=json.dumps(action))
if response.status_code == 204:
self.logger.info(f"Successfully ended meeting {meeting_id}")
else:
self.logger.error("Error ending meeting: " + response.text)
return response.status_code
except Exception as e:
self.logger.error("Exception in end_meeting: " + str(e))
return -1
send_mail()
The send_mail()
method is used if you want to send an email in response to one of the Zoom events
# Update the IP of your SMTP server if you want to use this function to send an email
def send_mail(self, to_email: list[str], subject: str, message: str,
server: str = '1.2.3.4', from_email: str = 'critical.incident@example.com') -> None:
"""
Send an email.
Args:
to_email (list[str]): List of recipient email addresses.
subject (str): Subject of the email.
message (str): Body of the email.
server (str, optional): SMTP server address.
from_email (str, optional): Sender email address.
"""
try:
msg = EmailMessage()
msg['Subject'] = subject
msg['From'] = from_email
msg['To'] = ', '.join(to_email)
msg.set_content(message)
self.logger.info("Sending email.")
with smtplib.SMTP(server) as smtp_server:
smtp_server.set_debuglevel(1)
smtp_server.send_message(msg)
print('Successfully sent the mail.')
except Exception as e:
self.logger.error("Error sending email: " + str(e))
shutdown()
The shutdown
method is designed to gracefully exit the application. It accomplishes this by shutting down the scheduler—which manages keep-alives and token refresh tasks—and closing the WebSocket connection. This ensures that all resources are properly released when the application is stopped.
def shutdown(self) -> None:
"""Gracefully shutdown the scheduler and close the WebSocket connection."""
self.logger.info("Initiating graceful shutdown...")
if self.scheduler:
self.scheduler.shutdown(wait=False)
self.logger.info("Scheduler shut down.")
if hasattr(self, 'ws'):
self.logger.info("Closing WebSocket connection...")
try:
self.ws.close()
except Exception as e:
self.logger.error("Error closing WebSocket: " + str(e))
self.logger.info("Shutdown complete.")
run()
The run
method is responsible for opening the WebSocket connection. It uses the WebSocket Secure (wss://) URL defined in your Zoom application—ensuring that your access token is appended to the URL string (e.g., "wss://ws.zoom.us/ws?subscriptionId=blahblahblah&access_token="
). Additionally, this method sets up handlers for specific WebSocket events, including on_open
, on_message
, on_error
, and on_close
. These callbacks ensure that the connection is properly managed and that incoming messages trigger the appropriate actions in your application.
def run(self) -> None:
"""Establish and maintain the WebSocket connection."""
websocket.enableTrace(False)
# Copy your wss endpoint url from your Zoom application settings.
# Be sure to include &access_token= at the end of your url
ws = websocket.WebSocketApp(
"wss://ws.zoom.us/ws?subscriptionId=m98soh8RRIOsx_WeR5AIBA&access_token=" + str(self.ACCESS_TOKEN),
on_open=self.on_open,
on_message=self.on_message,
on_error=self.on_error,
on_close=self.on_close
)
try:
ws.run_forever(reconnect=5)
except Exception as e:
self.logger.error("Error in WebSocket run_forever: " + str(e))
WebSocket Handlers
These handlers define what actions we take in regards to specific WebSocket events including on_message
, on_error
, on_close
, and on_open
def on_message(self, ws: websocket.WebSocketApp, message: str) -> None:
"""WebSocket on_message handler."""
self.process_message(message)
def on_error(self, ws: websocket.WebSocketApp, error: Exception) -> None:
"""WebSocket on_error handler."""
error_msg = f"Encountered error: {str(error)}"
print(error_msg)
self.logger.error(error_msg)
def on_close(self, ws: websocket.WebSocketApp, close_status_code: int, close_msg: str) -> None:
"""WebSocket on_close handler."""
print("Connection closed")
self.logger.info("Connection closed")
def on_open(self, ws: websocket.WebSocketApp) -> None:
"""WebSocket on_open handler."""
print("Connection opened")
# Save the WebSocket reference for heartbeat
self.ws = ws
main()
The main
method serves as the entry point of our application. It simply creates an instance of the ZoomWebSocket
class and starts the WebSocket connection by calling zws.run()
. Additionally, a graceful shutdown function is set up to handle exits (for example, when using Ctrl+C), ensuring that resources are properly cleaned up.
if __name__ == "__main__":
zws = ZoomWebSocket()
def graceful_shutdown(signum, frame) -> None:
"""Handle shutdown signals for graceful termination."""
zws.logger.info("Shutdown signal received.")
print("Shutdown signal received.")
zws.shutdown()
sys.exit(0)
signal.signal(signal.SIGINT, graceful_shutdown)
signal.signal(signal.SIGTERM, graceful_shutdown)
zws.run()
.env File
We’ll use a .env
file to securely store sensitive credentials like ACCOUNT_ID
, CLIENT_ID
, and CLIENT_SECRET
. Be sure to fill in this file with the appropriate values from your Zoom application. In the future, we could also move other configuration items—such as the WebSocket URL and the meeting ID—into the .env
file, but that’s a project for another day.
# .env for ZoomWebSocket.py
ACCOUNT_ID="account id from your Zoom Application"
CLIENT_ID="client id from your Zoom Application"
CLIENT_SECRET="client secret from your Zoom Application"
Putting It All Together
Here is the complete code:
#!/usr/bin/env python
"""
ZoomWebSocket - A python class to interact with ZoomWebSockets.
Written by James Ferris
Feel free to use as needed.
Zoom API Reference: https://developers.zoom.us/docs/api/rest/reference/zoom-api/methods/#tag/Meetings
"""
import os
import sys
import json
import datetime
import signal
import logging
from logging.handlers import RotatingFileHandler
import smtplib
from email.message import EmailMessage
import requests
from requests.auth import HTTPBasicAuth
import websocket
from apscheduler.schedulers.background import BackgroundScheduler
from dotenv import load_dotenv
load_dotenv()
class ZoomWebSocket:
ACCOUNT_ID: str = os.getenv("ACCOUNT_ID")
CLIENT_ID: str = os.getenv("CLIENT_ID")
CLIENT_SECRET: str = os.getenv("CLIENT_SECRET")
meeting_id: str = "83148128109"
# Define a list of (user, phone_number) tuples, IE numbers we want to call when the meeting starts
participants = [
("IT Help Desk", "15554560001")
# ("Network Oncall Engineer", "15554561000"),
# ("Server Oncall Enginner", "15554562000")
]
def __init__(self) -> None:
"""Initialize the ZoomWebSocket instance, set up logging, session, and scheduler."""
self.start_time: datetime.datetime = datetime.datetime.now()
# Set up rotating log handler
log_file = "ZoomWebSocketLog.txt"
log_handler = RotatingFileHandler(log_file, maxBytes=10 * 1024 * 1024, backupCount=3)
log_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
self.logger = logging.getLogger("ZoomWebSocket")
self.logger.setLevel(logging.INFO)
self.logger.addHandler(log_handler)
self.logger.info(f"ZoomWebSocket started at {self.start_time}")
# Create a persistent requests session
self.session = requests.Session()
# Get the access token
self.ACCESS_TOKEN = self.get_access_token()
self.logger.info("Access token retrieved.")
# Set up the scheduler for heartbeat and token refresh
self.scheduler = BackgroundScheduler()
self.scheduler.add_job(self.send_heartbeat, 'interval', seconds=20)
self.scheduler.add_job(self.refresh_token, 'interval', minutes=55)
self.scheduler.start()
def get_access_token(self) -> str:
"""
Retrieve the access token from Zoom.
Returns:
str: The access token if successful, otherwise an empty string.
"""
url = "https://zoom.us/oauth/token"
data = {
'grant_type': 'account_credentials',
'account_id': self.ACCOUNT_ID
}
try:
response = self.session.post(
url,
auth=HTTPBasicAuth(self.CLIENT_ID, self.CLIENT_SECRET),
data=data,
timeout=10
)
response.raise_for_status()
json_response = response.json()
self.access_token_expires = datetime.datetime.now() + datetime.timedelta(hours=1)
self.logger.info(f"Access Token Expires {self.access_token_expires}")
return json_response.get("access_token", "")
except requests.exceptions.RequestException as e:
error_msg = f"Error fetching Access Token: {str(e)}"
print(error_msg)
self.logger.error(error_msg)
return ""
def refresh_token(self) -> None:
"""Refresh the access token using the get_access_token method."""
self.logger.info("Refreshing access token...")
new_token = self.get_access_token()
if new_token:
self.ACCESS_TOKEN = new_token
self.logger.info("New access token acquired.")
else:
self.logger.error("Failed to refresh access token.")
def send_heartbeat(self) -> None:
"""Send a heartbeat message through the WebSocket to Zoom."""
try:
if hasattr(self, 'ws'):
self.ws.send('{ "module": "heartbeat" }')
self.logger.info("Heartbeat sent to Zoom")
else:
self.logger.warning("WebSocket not connected; heartbeat not sent.")
except Exception as e:
self.logger.error("Error sending heartbeat: " + str(e))
def process_message(self, message: str) -> None:
"""
Process a message received via WebSocket.
Args:
message (str): The raw JSON string received.
"""
try:
j = json.loads(message)
self.logger.info(f"{j.get('module')} message received")
print(json.dumps(j, indent=4))
self.logger.info(json.dumps(j, indent=4))
except Exception as e:
self.logger.error("Error processing message JSON: " + str(e))
return
if j.get('module') == 'message':
try:
content_string = j['content']
content = json.loads(content_string)
except Exception as e:
self.logger.error("Error processing content JSON: " + str(e))
return
try:
event = content.get('event')
if event == "meeting.participant_left":
self.process_leave_event(content)
elif event == "meeting.ended":
self.process_meeting_end_event(content)
elif event == "meeting.participant_joined":
self.process_participant_joined_event(content)
elif event == "meeting.started":
self.process_meeting_started_event(content)
except Exception as e:
self.logger.error("Error processing event: " + str(e))
elif j.get('module') == 'heartbeat':
self.logger.info(f"Received heartbeat response. Success = {str(j.get('success'))}")
print(f"{datetime.datetime.now()} - Received heartbeat response. Success = {str(j.get('success'))}")
def process_leave_event(self, content: dict) -> None:
"""
Process an event when a participant leaves a meeting.
Args:
content (dict): The event content dictionary.
"""
try:
meeting = content['payload']['object']['id']
user = content['payload']['object']['participant']['user_name']
topic = content['payload']['object']['topic']
if meeting == self.meeting_id:
print(f"{user} has left the meeting {topic}!")
self.logger.info(f"{user} has left the meeting {topic}!")
details = self.get_meeting_details(self.meeting_id)
participant_count = details.get("participants", 0)
if participant_count == 0:
self.logger.info("There are zero participants in the meeting. Ending call.")
self.end_meeting(self.meeting_id)
except Exception as e:
self.logger.error("Error in process_leave_event: " + str(e))
def process_meeting_end_event(self, content: dict) -> None:
"""
Process an event when a meeting ends.
Args:
content (dict): The event content dictionary.
"""
try:
meeting = content['payload']['object']['id']
print(f"{meeting} has ended!")
self.logger.info(f"{meeting} has ended!")
except Exception as e:
self.logger.error("Error in process_meeting_end_event: " + str(e))
def process_participant_joined_event(self, content: dict) -> None:
"""
Process an event when a participant joins a meeting.
Args:
content (dict): The event content dictionary.
"""
try:
user = content['payload']['object']['participant']['user_name']
topic = content['payload']['object']['topic']
print(f"{user} has joined the meeting {topic}!")
self.logger.info(f"{user} has joined the meeting {topic}!")
except Exception as e:
self.logger.error("Error in process_participant_joined_event: " + str(e))
def process_meeting_started_event(self, content: dict) -> None:
"""
Process an event when a meeting starts.
Args:
content (dict): The event content dictionary.
"""
try:
meeting = content['payload']['object']['id']
print(f"Meeting ID: {meeting} has started!")
self.logger.info(f"Meeting ID: {meeting} has started!")
if meeting == self.meeting_id:
print("Meeting has started. Placing calls to participants!")
self.logger.info("Meeting has started. Placing calls to participants!")
for user, phone in self.participants:
self.place_call(meeting, user, phone)
except Exception as e:
self.logger.error("Error in process_meeting_started_event: " + str(e))
def on_message(self, ws: websocket.WebSocketApp, message: str) -> None:
"""WebSocket on_message handler."""
self.process_message(message)
def on_error(self, ws: websocket.WebSocketApp, error: Exception) -> None:
"""WebSocket on_error handler."""
error_msg = f"Encountered error: {str(error)}"
print(error_msg)
self.logger.error(error_msg)
def on_close(self, ws: websocket.WebSocketApp, close_status_code: int, close_msg: str) -> None:
"""WebSocket on_close handler."""
print("Connection closed")
self.logger.info("Connection closed")
def on_open(self, ws: websocket.WebSocketApp) -> None:
"""WebSocket on_open handler."""
print("Connection opened")
# Save the WebSocket reference for heartbeat
self.ws = ws
def place_call(self, meeting_ID: str, name: str, phone_number: str) -> None:
"""
Place a call to a participant to join the meeting.
Args:
meeting_ID (str): The meeting ID.
name (str): The invitee's name.
phone_number (str): The invitee's phone number.
"""
meeting_options = {
"method": "participant.invite.callout",
"params": {
"invitee_name": str(name),
"phone_number": str(phone_number),
"invite_options": {
"require_greeting": False,
"require_pressing_one": False
},
}
}
headers = {
'authorization': 'Bearer ' + self.ACCESS_TOKEN,
'content-type': 'application/json'
}
url = f'https://api.zoom.us/v2/live_meetings/{meeting_ID}/events'
try:
self.logger.info(f"Placing call to {phone_number} to join {name} for meeting {meeting_ID}")
response = self.session.patch(url, headers=headers, data=json.dumps(meeting_options))
print(response.text)
self.logger.info(f"Place call request returned {str(response)}")
except Exception as e:
self.logger.error("Error placing call: " + str(e))
def get_meeting_details(self, meeting_id: str) -> dict:
"""
Retrieve meeting details.
Args:
meeting_id (str): The meeting ID.
Returns:
dict: The meeting details or an empty dictionary if an error occurs.
"""
headers = {
'authorization': 'Bearer ' + self.ACCESS_TOKEN,
'content-type': 'application/json'
}
try:
response = self.session.get(f'https://api.zoom.us/v2/metrics/meetings/{meeting_id}', headers=headers)
if response.status_code == 200:
self.logger.info(f"Successfully retrieved meeting details for meeting {meeting_id}")
else:
self.logger.error("Error retrieving meeting details: " + response.text)
return response.json()
except Exception as e:
self.logger.error("Error in get_meeting_details: " + str(e))
return {}
def end_meeting(self, meeting_id: str) -> int:
"""
End a meeting.
Args:
meeting_id (str): The meeting ID.
Returns:
int: The HTTP status code from the request.
"""
action = {"action": "end"}
headers = {
'authorization': 'Bearer ' + self.ACCESS_TOKEN,
'content-type': 'application/json'
}
try:
response = self.session.put(f'https://api.zoom.us/v2/meetings/{meeting_id}/status',
headers=headers, data=json.dumps(action))
if response.status_code == 204:
self.logger.info(f"Successfully ended meeting {meeting_id}")
else:
self.logger.error("Error ending meeting: " + response.text)
return response.status_code
except Exception as e:
self.logger.error("Exception in end_meeting: " + str(e))
return -1
# Update the IP of your SMTP server if you want to use this function to send an email
def send_mail(self, to_email: list[str], subject: str, message: str,
server: str = '1.2.3.4', from_email: str = 'critical.incident@example.com') -> None:
"""
Send an email.
Args:
to_email (list[str]): List of recipient email addresses.
subject (str): Subject of the email.
message (str): Body of the email.
server (str, optional): SMTP server address.
from_email (str, optional): Sender email address.
"""
try:
msg = EmailMessage()
msg['Subject'] = subject
msg['From'] = from_email
msg['To'] = ', '.join(to_email)
msg.set_content(message)
self.logger.info("Sending email.")
with smtplib.SMTP(server) as smtp_server:
smtp_server.set_debuglevel(1)
smtp_server.send_message(msg)
print('Successfully sent the mail.')
except Exception as e:
self.logger.error("Error sending email: " + str(e))
def shutdown(self) -> None:
"""Gracefully shutdown the scheduler and close the WebSocket connection."""
self.logger.info("Initiating graceful shutdown...")
if self.scheduler:
self.scheduler.shutdown(wait=False)
self.logger.info("Scheduler shut down.")
if hasattr(self, 'ws'):
self.logger.info("Closing WebSocket connection...")
try:
self.ws.close()
except Exception as e:
self.logger.error("Error closing WebSocket: " + str(e))
self.logger.info("Shutdown complete.")
def run(self) -> None:
"""Establish and maintain the WebSocket connection."""
websocket.enableTrace(False)
# Copy your wss endpoint url from your Zoom application settings.
# Be sure to include &access_token= at the end of your url
ws = websocket.WebSocketApp(
"wss://ws.zoom.us/ws?subscriptionId=blahblahblah&access_token=" + str(self.ACCESS_TOKEN),
on_open=self.on_open,
on_message=self.on_message,
on_error=self.on_error,
on_close=self.on_close
)
try:
ws.run_forever(reconnect=5)
except Exception as e:
self.logger.error("Error in WebSocket run_forever: " + str(e))
if __name__ == "__main__":
zws = ZoomWebSocket()
def graceful_shutdown(signum, frame) -> None:
"""Handle shutdown signals for graceful termination."""
zws.logger.info("Shutdown signal received.")
print("Shutdown signal received.")
zws.shutdown()
sys.exit(0)
signal.signal(signal.SIGINT, graceful_shutdown)
signal.signal(signal.SIGTERM, graceful_shutdown)
zws.run()
Helpful Links:
Find this code on my github
https://github.com/thevoiceguy/ZoomWebSocket
Zoom WebSockets
https://developers.zoom.us/docs/api/websockets
Zoom API Reference:
Be First to Comment