commit
deaa54e8ff
41
example.py
41
example.py
@ -10,35 +10,50 @@ from trio_paho_mqtt import AsyncClient
|
|||||||
|
|
||||||
client_id = 'trio-paho-mqtt/' + str(uuid.uuid4())
|
client_id = 'trio-paho-mqtt/' + str(uuid.uuid4())
|
||||||
topic = client_id
|
topic = client_id
|
||||||
|
n_messages = 3
|
||||||
print("Using client_id / topic: " + client_id)
|
print("Using client_id / topic: " + client_id)
|
||||||
|
print(f"Sending {n_messages} messages.")
|
||||||
|
|
||||||
|
|
||||||
async def test_read(client):
|
async def test_read(client, nursery):
|
||||||
|
"""Read from the broker. Quit after all messages have been received."""
|
||||||
|
count = 0
|
||||||
async for msg in client.messages():
|
async for msg in client.messages():
|
||||||
print(f"Received msg: {msg}")
|
# Got a message. Print the first few bytes.
|
||||||
|
print(f"< Received msg: {msg.payload[:18]}")
|
||||||
|
count += 1
|
||||||
|
if count == n_messages:
|
||||||
|
# We have received all the messages. Cancel.
|
||||||
|
nursery.cancel_scope.cancel()
|
||||||
|
|
||||||
|
|
||||||
async def test_write(client):
|
async def test_write(client):
|
||||||
for i in range(3):
|
"""Publish a long message. The message will typically be about 720k bytes, so it will take some time to send.
|
||||||
print("sleeping for 5 seconds")
|
Publishing asynchronously will cause this function to return almost immediately."""
|
||||||
await trio.sleep(5)
|
now = time.time()
|
||||||
now = time.time()
|
print(f"> Publishing: {now} * 40000")
|
||||||
print(f"publishing: {now} * 40000")
|
client.publish(topic, bytes(str(now), encoding='utf8') * 40000, qos=1)
|
||||||
client.publish(topic, bytes(str(now), encoding='utf8') * 40000, qos=1)
|
print(f"publish took {time.time() - now} seconds")
|
||||||
print(f"publish took {time.time() - now} seconds")
|
|
||||||
client.disconnect()
|
|
||||||
|
|
||||||
|
|
||||||
async def main():
|
async def main():
|
||||||
async with trio.open_nursery() as nursery:
|
async with trio.open_nursery() as nursery:
|
||||||
|
# Get a regular MQTT client
|
||||||
sync_client = mqtt.Client()
|
sync_client = mqtt.Client()
|
||||||
|
# Wrap it to create an asyncronous version
|
||||||
client = AsyncClient(sync_client, nursery)
|
client = AsyncClient(sync_client, nursery)
|
||||||
|
# Connect to the broker, and subscribe to the topic
|
||||||
client.connect('mqtt.eclipse.org', 1883, 60)
|
client.connect('mqtt.eclipse.org', 1883, 60)
|
||||||
client.subscribe(topic)
|
client.subscribe(topic)
|
||||||
|
|
||||||
nursery.start_soon(test_read, client)
|
# Start the reader
|
||||||
nursery.start_soon(test_write, client)
|
nursery.start_soon(test_read, client, nursery)
|
||||||
|
|
||||||
|
# Start a bunch of writers. Wait 5 seconds between them to demonstrate the asynchronous nature of reading and
|
||||||
|
# writing:
|
||||||
|
for i in range(n_messages):
|
||||||
|
nursery.start_soon(test_write, client)
|
||||||
|
await trio.sleep(5)
|
||||||
|
|
||||||
|
|
||||||
print("Starting")
|
print("Starting")
|
||||||
|
Loading…
Reference in New Issue
Block a user