12/10/2022
I've been seeing some impressive posts on social media for the past week or so about people's conversations with OpenAI's new ChatGPT. I finally decided to try it out for myself and I was very impressed with the quality of the output given the problem statement that I presented it with. This will be a short blog post in terms of my own content. Below I share my first conversation I had with ChatGPT.
For context, I asked a question about a software engineering problem that I have recently worked on, and I knew the answer to my question already - I just wanted to see how ChatGPT handled it. My favorite part was getting a feel for the natural conversational language flow from the last question and answer. Plus, ChatGPT closed with giving a good tradeoff explanation between using the JSON and Pickle python libraries.
For the icing on the cake, the header image for this post was generated with OpenAI's DALL*E, with the prompt "A cartoonish picture of a software engineer having an intense conversation with a computer".
Me:
With airflow running on kubernetes, what is the best way to run python code on another container?
ChatGPT:
If you want to run Python code on another container with Airflow running on Kubernetes, one way to do this is to use the KubernetesPodOperator in Airflow. This operator allows you to specify the details of the pod in which your Python code will be executed, including the Docker image to use and the command to run. For example, you could define a KubernetesPodOperator like this:
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
python_code_task = KubernetesPodOperator(
task_id="python_code_task",
name="python-code-pod",
image="python:3.7",
cmds=["python"],
arguments=["my_script.py"],
namespace='airflow')
This operator will run a pod using the python:3.7
Docker image and run the command python my_script.py
in that pod. This will execute the Python code in my_script.py
on the container within the Kubernetes cluster managed by Airflow.
Me:
How would we execute a specific function in my_script.py with input parameters?
ChatGPT:
To execute a specific function in my_script.py
with input parameters, you would need to pass the function and its arguments to the arguments
parameter of the KubernetesPodOperator
. The arguments
parameter should be a list of strings, where each string is a separate argument to be passed to the command specified in the cmds
parameter.
For example, let's say you have a Python script my_script.py
that contains a function my_function
with the following signature:
def my_function(param1: str, param2: int) -> str:
# function body
To execute this function with the arguments "hello"
and 42
, you could define your KubernetesPodOperator
like this:
python_code_task = KubernetesPodOperator(
task_id="python_code_task",
name="python-code-pod",
image="python:3.7",
cmds=["python", "-c"],
arguments=[
"from my_script import my_function; print(my_function('hello', 42))"
],
namespace='airflow')
This will run the Python interpreter in the container and execute the code from my_script import my_function; print(my_function('hello', 42))
, which will import the my_function
from my_script.py
and execute it with the specified arguments. The result of the function will be printed to the console, and you can use Airflow to capture this output and use it in your workflow.
Me:
If one of the input parameters is a large python dictionary, how would we specify the dictionary in the arguments list for the KubernetesPodOperator?
ChatGPT:
If one of the input parameters to your Python function is a large dictionary, you can pass it to the function using the json.dumps
method. This method allows you to convert a Python dictionary into a JSON string, which can be easily passed as an argument to the KubernetesPodOperator
.
For example, let's say you have a Python script my_script.py
that contains a function my_function
with the following signature:
def my_function(param1: str, param2: Dict[str, Any]) -> str:
# function body
And you want to pass the following dictionary as the param2
argument to my_function
:
my_dict = {
"key1": "value1",
"key2": "value2",
# ...
}
You could define your KubernetesPodOperator
like this:
import json
python_code_task = KubernetesPodOperator(
task_id="python_code_task",
name="python-code-pod",
image="python:3.7",
cmds=["python", "-c"],
arguments=[
"from my_script import my_function; import json; "
"my_dict = {'key1': 'value1', 'key2': 'value2'}; "
"print(my_function('hello', json.loads(json.dumps(my_dict))))"
],
namespace='airflow')
In this example, we use the json.dumps
method to convert the my_dict
dictionary into a JSON string, and then we use the json.loads
method to convert that JSON string back into a Python dictionary. We then pass this dictionary as the param2
argument to my_function
and execute it. The result of the function will be printed to the console, and you can use Airflow to capture this output and use it in your workflow.
Me:
How would you pass in a datetime.date type parameter into the arguments list?
ChatGPT:
To pass a datetime.date
type parameter into the arguments list for the KubernetesPodOperator
, you can use the strftime
method to convert the date into a string. This string can then be passed as an argument to the KubernetesPodOperator
, and your Python function can use the strptime method to convert the string back into a datetime.date
object.
For example, let's say you have a Python script my_script.py
that contains a function my_function
with the following signature:
from datetime import date
def my_function(param1: str, param2: date) -> str:
# function body
And you want to pass the following date as the param2
argument to my_function
:
my_date = date(2022, 12, 9)
You could define your KubernetesPodOperator
like this:
from datetime import date
python_code_task = KubernetesPodOperator(
task_id="python_code_task",
name="python-code-pod",
image="python:3.7",
cmds=["python", "-c"],
arguments=[
"from my_script import my_function; from datetime import datetime; "
"my_date_str = '2022-12-09'; "
"my_date = datetime.strptime(my_date_str, '%Y-%m-%d').date(); "
"print(my_function('hello', my_date))"
],
namespace='airflow')
In this example, we use the date.strftime
method to convert the my_date
object into a string in the format '%Y-%m-%d'
, and we pass this string as the param2
argument to my_function
. Inside the function, we use the datetime.strptime
method to convert the string back into a datetime.date
object, which we can then use as usual. The result of the function will be printed to the console, and you can use Airflow to capture this output and use it in your workflow.
Me:
What do you mean by "you can use Airflow to capture this output and use it in your workflow"?
ChatGPT:
When using the KubernetesPodOperator
in Airflow, the KubernetesPodOperator
will run the specified command in a container within the Kubernetes cluster managed by Airflow. If the command produces any output, this output will be written to the logs for the pod.
Airflow provides a mechanism to retrieve the logs for a pod, which you can use to capture the output of the command and use it in your workflow. To do this, you can use the KubernetesPodOperator.xcom_push
method, which allows you to specify a key-value pair to be stored in the Airflow database. The value for this key-value pair can be the output of the command, which you can retrieve using the KubernetesPodOperator.xcom_pull
method.
For example, let's say you have defined a KubernetesPodOperator
like this:
python_code_task = KubernetesPodOperator(
task_id="python_code_task",
name="python-code-pod",
image="python:3.7",
cmds=["python", "-c"],
arguments=[
"print('Hello, world!')"
],
xcom_push=True,
namespace='airflow')
This operator will run a container using the python:3.7
Docker image and execute the command python -c "print('Hello, world!')"
, which will print the string 'Hello, world!'
to the console. The xcom_push=True
parameter tells Airflow to store the output of this command in the Airflow database, using the default key return_value
.
You can then use the KubernetesPodOperator.xcom_pull
method to retrieve the output from the Airflow database and use it in your workflow. For example, you could define another task that uses the output of the python_code_task
like this:
from airflow.operators.python import PythonOperator
def use_output(**kwargs):
output = kwargs["task_instance"].xcom_pull(task_ids="python_code_task")
print(output)
use_output_task = PythonOperator(
task_id="use_output_task",
python_callable=use_output,
provide_context=True,
)
In this example, the use_output
function uses the KubernetesPodOperator.xcom_pull
method to retrieve the output of the python_code_task
using the default key return_value
, and then prints this output to the console. You can then use this output in your workflow, for example by using it as an input to another task.
Me:
Can the return_value be any data type or does it work best as a string?
ChatGPT:
The return_value
for the KubernetesPodOperator.xcom_push
method can be any data type that can be serialized using the Pickle library in Python. This includes most built-in Python data types, such as strings, integers, floats, lists, dictionaries, etc.
However, using complex data types such as objects or nested data structures can make it more difficult to work with the return_value in your workflow. In general, it is best to use simple data types such as strings, integers, or floats for the return_value
, to make it easier to use in other tasks.
If you need to use a complex data type as the return_value, you can serialize the data using the Pickle library and then deserialize it in other tasks using the pickle.loads method. For example, let's say you have defined a KubernetesPodOperator
like this:
import pickle
def my_function(param1: str, param2: Dict[str, Any]) -> Dict[str, Any]:
# function body
my_dict = {
"key1": "value1",
"key2": "value2",
# ...
}
python_code_task = KubernetesPodOperator(
task_id="python_code_task",
name="python-code-pod",
image="python:3.7",
cmds=["python", "-c"],
arguments=[
"from my_script import my_function; import pickle; "
"my_dict = {'key1': 'value1', 'key2': 'value2'}; "
"result = my_function('hello', my_dict); "
"print(pickle.dumps(result))"
],
xcom_push=True,
xcom_push_args={"key": "my_key"},
namespace='airflow')
In this example, the my_function
returns a dictionary as its result. We use the pickle.dumps
method to serialize this dictionary into a string, and then we print the string to the console. The xcom_push=True
parameter tells Airflow to store the output of this command in the Airflow database, using the key my_key
specified in the xcom_push_args
parameter.
You can then use the KubernetesPodOperator.xcom_pull
method to retrieve the output from the Airflow database and deserialize it using the pickle.loads
method. For example, you could define another task that uses the output of the python_code_task
like this:
from airflow.operators.python import PythonOperator
def use_output(**kwargs):
import pickle
output = kwargs["task_instance"].xcom_pull(task_ids="python_code_task", key="my_key")
result = pickle.loads(output)
print(result)
use_output_task = PythonOperator(
task_id="use_output_task",
python_callable=use_output,
provide_context=True,
)
In this example, the use_output function uses
NOTE: For some reason ChatGPT didn't return any other output after the word "uses". So there's one flaw, i.e. occasional incomplete sentences/explanations :(
Me:
Rather than serializing with pickle, I suppose we could just as easily serialize the data with json.dumps, right?
ChatGPT:
Yes, instead of using the Pickle library to serialize and deserialize complex data types for the return_value
, you can use the json.dumps
and json.loads
methods from the JSON library in Python.
For example, let's say you have defined a KubernetesPodOperator
like this:
import json
def my_function(param1: str, param2: Dict[str, Any]) -> Dict[str, Any]:
# function body
my_dict = {
"key1": "value1",
"key2": "value2",
# ...
}
python_code_task = KubernetesPodOperator(
task_id="python_code_task",
name="python-code-pod",
image="python:3.7",
cmds=["python", "-c"],
arguments=[
"from my_script import my_function; import json; "
"my_dict = {'key1': 'value1', 'key2': 'value2'}; "
"result = my_function('hello', my_dict); "
"print(json.dumps(result))"
],
xcom_push=True,
xcom_push_args={"key": "my_key"},
namespace='airflow')
In this example, the my_function
returns a dictionary as its result. We use the json.dumps
method to serialize this dictionary into a JSON string, and then we print the string to the console. The xcom_push=True
parameter tells Airflow to store the output of this command in the Airflow database, using the key my_key
specified in the xcom_push_args
parameter.
You can then use the KubernetesPodOperator.xcom_pull
method to retrieve the output from the Airflow database and deserialize it using the json.loads
method. For example, you could define another task that uses the output of the python_code_task
like this:
from airflow.operators.python import PythonOperator
def use_output(**kwargs):
import json
output = kwargs["task_instance"].xcom_pull(task_ids="python_code_task", key="my_key")
result = json.loads(output)
print(result)
use_output_task = PythonOperator(
task_id="use_output_task",
python_callable=use_output,
provide_context=True,
)
In this example, the use_output
function uses the KubernetesPodOperator.xcom_pull
method to retrieve the output of the python_code_task
using the key my_key
, and then uses the json.loads
method to deserialize the JSON string into a dictionary. The dictionary is then printed to the console.
Using the JSON library to serialize and deserialize complex data types has the advantage of being more readable and easier to work with than using the Pickle library. However, it is also more limited in terms of the data types it can handle, so you should consider whether it is suitable for your use case before using it.