This may be surprising to regular pandas users and may lead to unexpected or silent errors.
Let’s construct an example dataframe to demonstrate the problem. The following dataframe
has two columns – column x
has type integer, column y
has type string. Column x
has an element 1
(integer) and column y
has an element "1"
(string).
df = spark.createDataFrame([(1, 'a'), (2, 'b'), (3, '1')], 'x INT, y STRING')
df.dtypes
# [('x', 'int'), ('y', 'string')]
df.show()
# +---+---+
# | x| y|
# +---+---+
# | 1| a|
# | 2| b|
# | 3| 1|
# +---+---+
Let’s say we want to count how many 1
(integer) values are in each column. We should get back
these results – column x
should have one instance of 1
(integer) and column y
should have
zero instances of 1
(integer). But only one of these happen.
df[df['x'].isin(1)].count() # 1 (correct)
df[df['y'].isin(1)].count() # 1 (incorrect)
We may suspect that the
pyspark.sql.Column.isin
method
method has a bug. Let’s check another way without using .isin
method.
df[df['x'] == 1].count() # 1 (correct)
df[df['y'] == 1].count() # 1 (incorrect)
We still get the same partially incorrect results. Let’s check something else – filter
for "1"
(string) instead.
df[df['x'].isin("1")].count() # 1 (incorrect)
df[df['y'].isin("1")].count() # 1 (correct)
df[df['x'] == "1"].count() # 1 (incorrect)
df[df['y'] == "1"].count() # 1 (correct)
It appears that (1) .isin
is not the reason and (2) spark does not respect data types when
filtering. Python is a dynamically typed language (though it now supports optional type checking).
Perhaps that is causing this. Let’s just use SQL (via python) instead and verify.
df.createOrReplaceTempView('dummy')
spark.sql('SELECT * FROM dummy WHERE x = 1').count() # 1 (correct)
spark.sql('SELECT * FROM dummy WHERE y = 1').count() # 1 (incorrect)
spark.sql('SELECT * FROM dummy WHERE x = "1"').count() # 1 (incorrect)
spark.sql('SELECT * FROM dummy WHERE y = "1"').count() # 1 (correct)
We get the same results back – data types are not respected.
Let’s investigate if this happens in a statically typed language like Scala.
import spark.implicits._
val df = Seq(
(1, "a"),
(2, "b"),
(3, "1")
).toDF("x", "y")
df.dtypes
// Array[(String, String)] = Array((x,IntegerType), (y,StringType))
df.filter($"x" === 1).count() // 1 (correct)
df.filter($"y" === 1).count() // 1 (incorrect)
df.filter($"x" === "1").count() // 1 (incorrect)
df.filter($"y" === 1).count() // 1 (correct)
Scala has the same problem!
Turns out we see the same behavior in at least some SQL systems. Let’s use SQL (using sqlite3
)
without using spark at all.
import sqlite3
conn = sqlite3.connect(':memory:')
c = conn.cursor()
c.execute('CREATE TABLE dummy (x integer, y string)')
c.execute('INSERT INTO dummy VALUES (1, "a")')
c.execute('INSERT INTO dummy VALUES (2, "b")')
c.execute('INSERT INTO dummy VALUES (3, "1")')
conn.commit()
list(c.execute('SELECT * FROM dummy'))
list(c.execute('SELECT * FROM dummy WHERE x=1')) # [(1, 'a')] (correct)
list(c.execute('SELECT * FROM dummy WHERE y=1')) # [(3, 1)] (incorrect)
list(c.execute('SELECT * FROM dummy WHERE x="1"')) # [(1, 'a')] (incorrect)
list(c.execute('SELECT * FROM dummy WHERE y="1"')) # [(3, 1)] (correct)
conn.close()
Turns out SQL does not respect data types either!
pandas
has this bug.Turns out that even pandas
has this problem. For pyspark and SQL,
this problem appears to be a consistent design issue but for pandas
this problem appears to be
bug instead of a design choice. I used pandas
version 0.25.3
for this experiment.
pdf['x'].isin([1]).sum() # 1 (correct)
pdf['y'].isin([1]).sum() # 0 (correct)
pdf['x'].isin(["1"]).sum() # 1 (incorrect)
pdf['y'].isin(["1"]).sum() # 1 (correct)
Note how only the third line in the above snippet returns incorrect results. This indicates
that pandas
ignores the data type sometimes but not always. Further, in the case of pandas
the problem is only in the .isin
method and not in general (like in SQL and pyspark), as
shown by the following example that does not use .isin
.
# Results are correct when we don't use pd.Series.isin() method
pdf['x'].apply(lambda x: x == 1).sum() # 1 (correct)
pdf['y'].apply(lambda y: y == 1).sum() # 0 (correct)
pdf['x'].apply(lambda x: x == "1").sum() # 0 (correct)
pdf['y'].apply(lambda y: y == "1").sum() # 1 (correct)
(pdf['x'] == 1).sum() # 1 (correct)
(pdf['y'] == 1).sum() # 0 (correct)
(pdf['x'] == "1").sum() # 0 (correct)
(pdf['y'] == "1").sum() # 1 (correct)
Looking even deeper into the pandas.Series.isin
method, we see that it relies upon the
pandas.core.algorithms.isin
function.
Lines 452-453
(in pandas
version 0.25.3
) contain the following code (comments are mine):
# Lines 452-453 in pandas 0.25.3
comps, dtype, _ = _ensure_data(comps) # comps = elements of the column
values, _, _ = _ensure_data(values, dtype=dtype) # values = list of values passed to `.isin`
As you can see, values
get type casted into the dtype
of the column. We can verify this
by actually running the code ourselves.
from pandas.core.algorithms import _ensure_data
comps = pdf['x']
values = ["1"] # str
comps, dtype, _ = _ensure_data(comps)
print(dtype) # int64
values, _, _ = _ensure_data(values, dtype=dtype)
print(values) # [1]
print(values.dtype) # int64
The reason why this does not affect column y
in pdf
above is because _ensure_data
returns
dtype=object
for column y
. Searching pandas
GitHub issues, we see that
this bug
has been brought up before but was somehow ignored.
Spark and SQL ignore data types when filtering on equality. This seems to be a design issue and is consistent throughout (or at least as far as I can see). Pandas, on the other hand, exhibits this problem inconsistently which may lead to complacency.
For end users, the best way to prevent this mistake is to always manually ensure that the data types of the column match the data type of every value in a list values used for equality filtering.
The standard implementation of CPython 3 has
both
reference counting and a generational garbage collector. The generational garbage collector
is responsible for collecting reference cycles periodically.
To review, here is an example of a reference cycle (note the use of .append
).
x = [1, 2, 3]
y = [x]
x.append(y)
print(x)
# [1, 2, 3, [[...]]]
# [...] is a tell-tale sign of a ref cycle
Let’s see the generational garbage collector in action. Define a few helper functions for our investigation.
import sys, os, gc, psutil # ignore E401
import numpy as np
process = psutil.Process(os.getpid())
def print_memory():
memory_mb = int(np.round(process.memory_info().rss / 1e6))
print('{}MB'.format(memory_mb))
def create_ref_cycle():
a = [np.random.rand(2000, 2000)]
b = [a]
a.append(b) # using .append() creates a ref cycle
def no_create_ref_cycle():
a = [np.random.rand(2000, 2000)]
b = [a]
a = a + [b] # redefining `a` doesn't create a ref cycle
Garbage collector is enabled by default. Let’s disable the garbage collector and create
lots of reference cycles that become inaccessible to us.
Measuring memory usage is a messy business that depends on lots of factors including the OS type.
We will look at the resident set (rss
) memory which may not be what your OS’s
GUI reports (such as the Activity Monitor in
MacOS). Your results will vary wildly every time you run these snippets.
Our aim is to notice the effect of enabling or disabling the garbage collection instead of
trying to get accurate memory usage. Also note that Python itself
doesn’t always return all of the unused
memory back to the OS.
# In a new python session with helper functions defined
gc.disable()
print_memory()
# 63MB
for _ in range(100):
create_ref_cycle() # consumes ~32MB per iteration
print_memory()
# ...
# 3168MB
# 3200MB
# 3232MB
# 3264MB
gc.enable()
gc.collect()
print_memory()
# 2112MB
Let’s now run the function that does not create reference cycles. The memory usage for this function does not keep on increasing as a result of reference cycles (though there might be minor increases due to other, unrelated reasons).
# In a new python session with helper functions defined
gc.disable()
print_memory()
# 63MB
for _ in range(100):
no_create_ref_cycle() # consumes nothing per iteration
print_memory()
# ...
# 95MB
# 95MB
# 95MB
# 95MB
gc.enable()
gc.collect()
print_memory()
# 95MB
The absolute memory numbers are not reliable – they vary by OS, RAM, other processes, and lots of other factors. Garbage collection runs periodically (not continuously) based on heuristics. You may see a temporary increase in memory usage until the garbage collection runs again. GC is also not perfect and does not prevent all memory leaks.
Don’t create reference cycles in your code. Know that the garbage collection in python is imperfect.
Spark Catalog API (spark.catalog.*
) provides good information about what tables are cached
and which UDF functions are available.
from pyspark.sql.functions import udf
my_square = udf(lambda x: x * x, 'double')
spark.udf.register('my_square', my_square)
spark.catalog.listFunctions()
# ...
# Function(name='my_square', description=None, className=None, isTemporary=True),
# ...
You can see a list of tables and see which ones are cached.
df = spark.createDataFrame([(x, x) for x in range(10000)], 'a INT, b INT')
df.is_cached
# False
df.createOrReplaceTempView('dummy')
spark.catalog.listTables()
# [Table(name='dummy', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]
spark.catalog.isCached('dummy')
# False
df.cache()
spark.catalog.isCached('dummy')
# True
Other functions such as spark.catalog.clearCache()
could also be very useful.
Creating an empty spark dataframe is a bit tricky. Let’s see some examples.
First, let’s create a SparkSession
object to use.
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('my_app').getOrCreate()
The following command fails because the schema cannot be inferred. We can make it work by specifying the schema as a string.
spark.createDataFrame([]) # fails!
# ...
# ValueError: can not infer schema from empty dataset
df = spark.createDataFrame([], 'a INT, b DOUBLE, c STRING') # works!
df.dtypes
# [('a', 'int'), ('b', 'double'), ('c', 'string')]
Things get a little bit more interesting when we create a spark dataframe from a pandas dataframe.
x = pd.DataFrame({'a': [1.0], 'b': [1.0]}) # works as expected
print(x.dtypes) # pandas dataframe has a schema
# a float64
# b float64
# dtype: object
spark.createDataFrame(x).dtypes # no need to specify schema because it can be inferred
# [('a', 'double'), ('b', 'double')]
An empty pandas dataframe has a schema but spark is unable to infer it.
y = pd.DataFrame({'a': [], 'b': []})
print(y.dtypes) # default dtype is float64
# a float64
# b float64
# dtype: object
spark.createDataFrame(y) # fails!
# ...
# ValueError: can not infer schema from empty dataset
Now, funnily enough, spark completely ignores an empty pandas dataframe’s schema.
# works as expected
spark.createDataFrame(pd.DataFrame({'a': [], 'b': []}), 'a INT, b DOUBLE').dtypes
# [('a', 'int'), ('b', 'double')]
# also works!
spark.createDataFrame(pd.DataFrame({'a': [], 'b': []}), 'a INT').dtypes
# [('a', 'int')]
# also works!
spark.createDataFrame(pd.DataFrame({'a': [], 'b': []}), 'b INT').dtypes
# [('b', 'int')]
# still works!
spark.createDataFrame(pd.DataFrame({'a': [], 'b': []}), 'c INT').dtypes
# [('c', 'int')]
It is a common task to recursively seek and destroy all .pyc
files within current
directory. The following command does that for us in a bash
shell.
find . -name "*.pyc" -type f -exec rm "{}" \;
Simply typing find . -name "*.pyc" -type f
would list all files (and not directories)
whose name matches the glob *.pyc
within the directory .
(current directory).
find
also accepts -exec
option in both Linux and MacOS (though -delete
option may not
always be available). See other variants
here.
The above command works even for filenames or paths that have spaces in them.
In contrast, using find . -name "*.pyc" -type f | xargs rm
is risky when filenames or paths
contain spaces. Here is a demo:
# In a bash shell
$ find . -name "*.pyc" -type f
./hello world.pyc
./p q r/this has spaces.pyc
./p q r/d.pyc
./abc.pyc
./a/pqr.pyc
$ find . -name "*.pyc" -type f -exec rm "{}" \;
$ find . -name "*.pyc" -type f
# Nothing anymore!